You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/12 18:29:00 UTC

[jira] [Commented] (KAFKA-6608) Add TimeoutException to KafkaConsumer#position()

    [ https://issues.apache.org/jira/browse/KAFKA-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473239#comment-16473239 ] 

ASF GitHub Bot commented on KAFKA-6608:
---------------------------------------

ConcurrencyPractitioner closed pull request #4643: [KAFKA-6608] Add timeout parameter to methods which fetches and reset…
URL: https://github.com/apache/kafka/pull/4643
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index e090f527414..87ffc97bf36 100644
--- a/build.gradle
+++ b/build.gradle
@@ -144,8 +144,8 @@ subprojects {
   if (!JavaVersion.current().isJava9Compatible())
     apply plugin: 'findbugs'
 
-  sourceCompatibility = 1.7
-  targetCompatibility = 1.7
+  sourceCompatibility = 1.8
+  targetCompatibility = 1.8
 
   compileJava {
     options.encoding = 'UTF-8'
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 0e27e1f0383..a26af5f5247 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.common.TopicPartition;
 
 import java.io.Closeable;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +91,11 @@
      */
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
 
+    /**
+     * @see KafkaConsumer#commitSync(Map, long, TimeUnit)
+     */
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, 
+                           final Duration duration);
     /**
      * @see KafkaConsumer#commitAsync()
      */
@@ -124,12 +130,22 @@
      * @see KafkaConsumer#position(TopicPartition)
      */
     public long position(TopicPartition partition);
+    
+    /**
+     * @see KafkaConsumer#position(TopicPartition, Long, TimeUnit)
+     */
+    public long position(TopicPartition partition, final Duration duration);
 
     /**
      * @see KafkaConsumer#committed(TopicPartition)
      */
     public OffsetAndMetadata committed(TopicPartition partition);
 
+    /**
+     * @see KafkaConsumer#committed(TopicPartition, long, TimeUnit)
+     */
+    public OffsetAndMetadata committed(TopicPartition partition, final Duration duration);
+
     /**
      * @see KafkaConsumer#metrics()
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 81137f3c8dd..aa57b6afc1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -37,6 +37,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -55,6 +56,7 @@
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -827,6 +829,10 @@ private KafkaConsumer(ConsumerConfig config,
         this.assignors = assignors;
     }
 
+    public long requestTimeoutMs() {
+        return requestTimeoutMs;
+    }
+
     /**
      * Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
      * partitions using {@link #assign(Collection)} then this will simply return the same partitions that
@@ -1259,6 +1265,50 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
         }
     }
 
+    /**
+    * Commit the specified offsets for the specified list of topics and partitions.
+    * <p>
+    * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+    * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+    * should not be used. The committed offset should be the next message your application will consume,
+    * i.e. lastProcessedMessageOffset + 1.
+    * <p>
+    * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
+    * encountered (in which case it is thrown to the caller).
+    * <p>
+    * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
+    * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
+    *
+    * @param offsets A map of offsets by partition with associated metadata
+    * @param duration The amount of time the user would like to block
+    * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+    *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
+    *             or if there is an active group with the same groupId which is using group management.
+    * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+    *             function is called
+    * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+    *             this function is called
+    * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+    * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+    *             configured groupId. See the exception for more details
+    * @throws java.lang.IllegalArgumentException if the committed offset is negative
+    * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+    *             is too large or if the topic does not exist).
+    * @throws TimeoutException if method duration exceeds maximum given time
+    */
+    @Override
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration duration) {
+        acquireAndEnsureOpen();
+        final long totalWaitTime = duration.toMillis();
+        try {
+            if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime)) {
+                throw new TimeoutException("Commiting offsets synchronously took too long.");
+            }
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partition.
      * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
@@ -1427,7 +1477,6 @@ public long position(TopicPartition partition) {
             while (offset == null) {
                 // batch update fetch positions for any partitions without a valid position
                 updateFetchPositions();
-                client.poll(retryBackoffMs);
                 offset = this.subscriptions.position(partition);
             }
             return offset;
@@ -1436,6 +1485,60 @@ public long position(TopicPartition partition) {
         }
     }
 
+    /**
+     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+     * This method may issue a remote call to the server if there is no current position for the given partition.
+     * <p>
+     * This call will block until either the position could be determined or an unrecoverable error is
+     * encountered (in which case it is thrown to the caller). However, if offset position is not retrieved
+     * within a given amount of time, the process will be terminated.
+     *
+     * @param partition The partition to get the position for
+     * @param duration  The maximum duration in which the method can block
+     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
+     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
+     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
+     *             the partition
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.TimeoutException if the method blocks for longer than requestTimoutMs
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     */
+    public long position(TopicPartition partition, final Duration duration) {
+        final long timeout = duration.toMillis();
+        acquireAndEnsureOpen();
+        try {
+            if (!this.subscriptions.isAssigned(partition))
+                throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
+            Long offset = this.subscriptions.position(partition);
+            final long startMs = time.milliseconds();
+            long finishMs = time.milliseconds();
+            while (offset == null && finishMs - startMs < timeout) {
+                // batch update fetch positions for any partitions without a valid position
+                updateFetchPositions(time.milliseconds(), timeout - (time.milliseconds() - startMs));
+                finishMs = time.milliseconds();
+                final long remainingTime = Math.max(0, timeout - (finishMs - startMs));
+                
+                if (remainingTime > 0) {
+                    client.poll(remainingTime);
+                    offset = this.subscriptions.position(partition);
+                    finishMs = time.milliseconds();
+                } else {
+                    break;
+                }
+            }
+            if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired.");
+            return offset;
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Get the last committed offset for the given partition (whether the commit happened by this process or
      * another). This offset will be used as the position for the consumer in the event of a failure.
@@ -1464,6 +1567,39 @@ public OffsetAndMetadata committed(TopicPartition partition) {
         }
     }
 
+    /**
+     * Get the last committed offset for the given partition (whether the commit happened by this process or
+     * another). This offset will be used as the position for the consumer in the event of a failure.
+     * <p>
+     * This call will block to do a remote call to get the latest committed offsets from the server.
+     *
+     * @param partition The partition to check
+     * @param duration  The duration we would want to block
+     * @return The last committed offset and metadata or null if there was no prior commit
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     */
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final Duration duration) {
+        acquireAndEnsureOpen();
+        final long totalWaitTime = duration.toMillis();
+        try {
+            Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition), 
+                                                                                               time.milliseconds(), 
+                                                                                               totalWaitTime, 
+                                                                                               time);
+            return offsets.get(partition);
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Get the metrics kept by the consumer
      */
@@ -1799,6 +1935,29 @@ private boolean updateFetchPositions() {
         return false;
     }
 
+    /**
+     * Set the fetch position to the committed position (if there is one) 
+     * or reset it using the offset reset policy the user has configured
+     * within a given time limit.
+     * 
+     * @param start        the time at which the operation begins
+     * @param timeoutMs    the maximum duration of the method
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+     *             defined
+     * @return true if all assigned positions have a position
+     */
+    private boolean updateFetchPositions(long start, long timeoutMs) {
+        if (subscriptions.hasAllFetchPositions())
+            return true;
+
+        coordinator.refreshCommittedOffsetsIfNeeded(start, timeoutMs);
+        subscriptions.resetMissingPositions();
+        fetcher.resetOffsetsIfNeeded();
+
+        return false;
+    }
+
     /**
      * Acquire the light lock and ensure that the consumer hasn't been closed.
      * @throws IllegalStateException If the consumer has been closed
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ceb7024b97b..f1edd89a042 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -244,6 +245,11 @@ public synchronized void commitSync() {
         commitSync(this.subscriptions.allConsumed());
     }
 
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, final Duration duration) {
+        commitSync(offsets);
+    }
+
     @Override
     public synchronized void seek(TopicPartition partition, long offset) {
         ensureNotClosed();
@@ -259,6 +265,11 @@ public synchronized OffsetAndMetadata committed(TopicPartition partition) {
         return new OffsetAndMetadata(0);
     }
 
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final Duration duration) {
+        return committed(partition);
+    }
+
     @Override
     public synchronized long position(TopicPartition partition) {
         ensureNotClosed();
@@ -272,6 +283,11 @@ public synchronized long position(TopicPartition partition) {
         return offset;
     }
 
+    @Override
+    public synchronized long position(TopicPartition partition, final Duration duration) {
+        return position(partition);
+    }
+
     @Override
     public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
         ensureNotClosed();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index dd4bb7038f3..a1c1f694ee7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -211,7 +211,7 @@ public synchronized void ensureCoordinatorReady() {
      * @param timeoutMs Maximum time to wait to discover the coordinator
      * @return true If coordinator discovery and initial connection succeeded, false otherwise
      */
-    protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
+    public synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
         long remainingMs = timeoutMs;
 
         while (coordinatorUnknown()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c966d54..215775dc7b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Measurable;
@@ -452,6 +453,27 @@ public void refreshCommittedOffsetsIfNeeded() {
         }
     }
 
+    /**
+     * Refresh the committed offsets for provided partitions.
+     * 
+     * @param startMs   The time in which the operation starts
+     * @param timeout   The maximum allowable duration of the method
+     * @throws TimeoutException if committed offsets cannot be retrieved within set amount of time
+     */
+    public void refreshCommittedOffsetsIfNeeded(long startMs, long timeout) {
+        Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
+        Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, startMs, timeout, time);
+        if (offsets == null) {
+            throw new TimeoutException("Offsets cannot be retrieved within set duration");
+        }
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            long offset = entry.getValue().offset();
+            log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
+            this.subscriptions.seek(tp, offset);
+        }
+    }
+
     /**
      * Fetch the current committed offsets from the coordinator for a set of partitions.
      * @param partitions The partitions to fetch offsets for
@@ -477,6 +499,39 @@ public void refreshCommittedOffsetsIfNeeded() {
             time.sleep(retryBackoffMs);
         }
     }
+    
+    /**
+     * Fetch the current committed offsets from the coordinator for a set of partitions 
+     * within a given set of time.
+     * @param partitions The partitions to fetch offsets for
+     * @param startMs    The start time of the operation
+     * @param timeoutMs    The maximum duration of the method
+     * @param time       Java utility which keeps track of time in form of long (milliseconds)
+     * @throws TimeoutException if offsets cannot be retrieved within set amount of time
+     * @return A map from partition to the committed offset
+     */
+    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions, 
+                                                                        long startMs, long timeoutMs, Time time) {
+        if (partitions.isEmpty())
+            return Collections.emptyMap();
+        while (time.milliseconds() < startMs + timeoutMs) {
+            ensureCoordinatorReady(startMs, timeoutMs);
+            if (time.milliseconds() > startMs + timeoutMs) throw new TimeoutException("Error, coordinator is not ready in allocated time!");
+
+            // contact coordinator to fetch committed offsets
+            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
+            client.poll(timeoutMs, time.milliseconds(), future);
+
+            if (future.succeeded())
+                return future.value();
+
+            if (future.isDone())
+                throw future.exception();
+
+            time.sleep(retryBackoffMs);
+        }
+        return null;
+    }
 
     public void close(long timeoutMs) {
         // we do not need to re-enable wakeups since we are closing already
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8c147a58f77..0a9874cd651 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -78,6 +78,7 @@
 import org.junit.rules.ExpectedException;
 
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -526,8 +527,7 @@ public void testResetToCommittedOffset() {
 
         client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator);
         consumer.poll(0);
-
-        assertEquals(539L, consumer.position(tp0));
+        assertEquals(539L, consumer.position(tp0, Duration.ofSeconds(2)));
     }
 
     @Test
@@ -1039,7 +1039,7 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() {
 
         ConsumerRecords<String, String> records = consumer.poll(5);
         assertEquals(1, records.count());
-        assertEquals(11L, consumer.position(tp0));
+        assertEquals(11L, consumer.position(tp0, Duration.ofSeconds(2)));
 
         // mock the offset commit response for to be revoked partitions
         AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 11);
@@ -1160,8 +1160,8 @@ public void testOffsetOfPausedPartitions() {
         offsetResponse.put(tp0, 3L);
         offsetResponse.put(tp1, 3L);
         client.prepareResponse(listOffsetsResponse(offsetResponse));
-        assertEquals(3L, consumer.position(tp0));
-        assertEquals(3L, consumer.position(tp1));
+        assertEquals(3L, consumer.position(tp0, Duration.ofSeconds(2)));
+        assertEquals(3L, consumer.position(tp1, Duration.ofSeconds(2)));
 
         client.requests().clear();
         consumer.unsubscribe();
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ab7ca64f11c..52127601e95 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.util
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
@@ -911,20 +912,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset)
   }
 
-  @Test
+  @Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
   def testOffsetFetchTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, 2000L, TimeUnit.MILLISECONDS)
   }
 
-  @Test
+  @Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
   def testOffsetFetchWithTopicAndGroupRead() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, 2000L, TimeUnit.MILLISECONDS)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 53b3ed679bb..cfc03696440 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -151,7 +151,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       if (coin == 0) {
         info("Seeking to end of log")
         consumer.seekToEnd(Collections.emptyList())
-        assertEquals(numRecords.toLong, consumer.position(tp))
+        assertEquals(numRecords.toLong, consumer.position(tp, 2000L, TimeUnit.MILLISECONDS))
       } else if (coin == 1) {
         val pos = TestUtils.random.nextInt(numRecords).toLong
         info("Seeking to " + pos)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a06e9e36528..5cd210d9b82 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,6 +15,7 @@ package kafka.api
 import java.util
 import java.util.regex.Pattern
 import java.util.{Collections, Locale, Properties}
+import java.util.concurrent.TimeUnit
 
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
@@ -583,15 +584,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(List(tp).asJava)
-    assertEquals(totalRecords, consumer.position(tp))
+    assertEquals(totalRecords, consumer.position(tp, 2000L, TimeUnit.MILLISECONDS))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
     consumer.seekToBeginning(List(tp).asJava)
-    assertEquals(0, consumer.position(tp), 0)
+    assertEquals(0, consumer.position(tp, 2000L, TimeUnit.MILLISECONDS), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
 
     consumer.seek(tp, mid)
-    assertEquals(mid, consumer.position(tp))
+    assertEquals(mid, consumer.position(tp, 2000L, TimeUnit.MILLISECONDS))
 
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt,
       startingTimestamp = mid.toLong)
@@ -601,15 +602,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.assign(List(tp2).asJava)
 
     consumer.seekToEnd(List(tp2).asJava)
-    assertEquals(totalRecords, consumer.position(tp2))
+    assertEquals(totalRecords, consumer.position(tp2, 2000L, TimeUnit.MILLISECONDS))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
     consumer.seekToBeginning(List(tp2).asJava)
-    assertEquals(0, consumer.position(tp2), 0)
+    assertEquals(0, consumer.position(tp2, 2000L, TimeUnit.MILLISECONDS), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = tp2)
 
     consumer.seek(tp2, mid)
-    assertEquals(mid, consumer.position(tp2))
+    assertEquals(mid, consumer.position(tp2, 2000L, TimeUnit.MILLISECONDS))
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt,
       startingTimestamp = mid.toLong, tp = tp2)
   }
@@ -626,7 +627,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producer.close()
   }
 
-  @Test
+  @Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
   def testPositionAndCommit() {
     sendRecords(5)
 
@@ -639,12 +640,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     this.consumers.head.assign(List(tp).asJava)
 
-    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers.head.position(tp))
+    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers.head.position(tp, 2000L, TimeUnit.MILLISECONDS))
     this.consumers.head.commitSync()
     assertEquals(0L, this.consumers.head.committed(tp).offset)
 
     consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers.head.position(tp))
+    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers.head.position(tp, 2000L, TimeUnit.MILLISECONDS))
     this.consumers.head.commitSync()
     assertEquals("Committed offset should be returned", 5L, this.consumers.head.committed(tp).offset)
 
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 8435e5a3a6c..6e3fad3b7f8 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -1,4 +1,5 @@
 /**
+
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
@@ -168,7 +169,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertEquals(2, readCommittedConsumer.assignment.size)
     readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment)
     readCommittedConsumer.assignment.asScala.foreach { tp =>
-      assertEquals(1L, readCommittedConsumer.position(tp))
+      assertEquals(1L, readCommittedConsumer.position(tp, 2, TimeUnit.SECONDS))
     }
 
     // undecided timestamps should not be searchable either
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 186276c22d8..e8620ac1088 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -125,6 +125,13 @@
 
     private static final String JMX_PREFIX = "kafka.streams";
     private static final int DEFAULT_CLOSE_TIMEOUT = 0;
+    /**
+     * @TODO This only exists to allow us to pass tests in Kafka Streams
+     * We will need to fix this in a future pull request, particularly 
+     * since how much time one would need to block for Kafka Streams is 
+     * still unknown
+     */
+    private static final int DEFAULT_BLOCKING_TIME = 20000;
 
     // processId is expected to be unique across JVMs and to be used
     // in userData of the subscription request to allow assignor be aware
@@ -526,7 +533,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store
      */
     public KafkaStreams(final Topology topology,
                         final Properties props) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -544,7 +551,7 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final Properties props,
                         final KafkaClientSupplier clientSupplier) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, Time.SYSTEM);
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, Time.SYSTEM, 20000L);
     }
 
     /**
@@ -559,9 +566,10 @@ public KafkaStreams(final Topology topology,
      * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
+                        final StreamsConfig config,
                         final Properties props,
                         final Time time) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier(), time);
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -581,7 +589,7 @@ public KafkaStreams(final Topology topology,
                         final Properties props,
                         final KafkaClientSupplier clientSupplier,
                         final Time time) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, time);
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, time, DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -590,7 +598,7 @@ public KafkaStreams(final Topology topology,
     @Deprecated
     public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final Properties props) {
-        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -599,7 +607,7 @@ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder bui
     @Deprecated
     public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final StreamsConfig config) {
-        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -609,7 +617,7 @@ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder bui
     public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
-        this(builder.internalTopologyBuilder, config, clientSupplier);
+        this(builder.internalTopologyBuilder, config, clientSupplier, DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -618,7 +626,7 @@ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder bui
     @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config) {
-        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -628,7 +636,7 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
-        this(topology.internalTopologyBuilder, config, clientSupplier);
+        this(topology.internalTopologyBuilder, config, clientSupplier, DEFAULT_BLOCKING_TIME);
     }
 
     /**
@@ -637,20 +645,22 @@ public KafkaStreams(final Topology topology,
     @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
-                        final Time time) {
-        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), time);
+                        final long maxCommitMs) {
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), maxCommitMs);
     }
 
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
-                         final KafkaClientSupplier clientSupplier) throws StreamsException {
-        this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
+                         final KafkaClientSupplier clientSupplier,
+                         final long maxCommitMs) throws StreamsException {
+        this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM, maxCommitMs);
     }
 
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
                          final KafkaClientSupplier clientSupplier,
-                         final Time time) throws StreamsException {
+                         final Time time,
+                         final long maxCommitMs) throws StreamsException {
         this.config = config;
         this.time = time;
 
@@ -731,6 +741,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                                              clientId,
                                              metrics,
                                              time,
+                                             DEFAULT_BLOCKING_TIME,
                                              streamsMetadataState,
                                              cacheSizePerThread,
                                              stateDirectory,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index e8ec5e9fe5f..fc73ba39512 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -45,12 +45,21 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class is responsible for the initialization, restoration, closing, flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per Application Instance.
  */
 public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager {
+    /**
+     * @TODO Currently, this is a temporary marker from which 
+     * we could pass tests. GlobalStateManageImpl will have to 
+     * be updated so that the amount of time for which we wait
+     * could be controlled through user input or a set configuration.
+     */
+    private static final long DEFAULT_WAIT_TIME = 20000L;
+
     private final Logger log;
     private final ProcessorTopology topology;
     private final Consumer<byte[], byte[]> globalConsumer;
@@ -248,7 +257,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
                 globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
             }
 
-            long offset = globalConsumer.position(topicPartition);
+            long offset = globalConsumer.position(topicPartition, DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS);
             final Long highWatermark = highWatermarks.get(topicPartition);
             BatchingStateRestoreCallback
                 stateRestoreAdapter =
@@ -268,7 +277,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
                         if (record.key() != null) {
                             restoreRecords.add(KeyValue.pair(record.key(), record.value()));
                         }
-                        offset = globalConsumer.position(topicPartition);
+                        offset = globalConsumer.position(topicPartition, DEFAULT_WAIT_TIME, TimeUnit.MILLISECONDS);
                     }
                     stateRestoreAdapter.restoreAll(restoreRecords);
                     stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76570e..5644957b478 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -39,9 +39,15 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class StoreChangelogReader implements ChangelogReader {
 
+    /**
+     * @TODO Currently, for position() in StoreChangelogReader,
+     * position is called with 20000L as blocking time. This is 
+     * not acceptable to most users, and needs to be changed.
+     */
     private final Logger log;
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final StateRestoreListener userStateRestoreListener;
@@ -50,13 +56,16 @@
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+    private final long maxBlockMs;
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener userStateRestoreListener,
-                                final LogContext logContext) {
+                                final LogContext logContext,
+                                final long maxBlockMs) {
         this.restoreConsumer = restoreConsumer;
         this.log = logContext.logger(getClass());
         this.userStateRestoreListener = userStateRestoreListener;
+        this.maxBlockMs = maxBlockMs;
     }
 
     @Override
@@ -175,7 +184,7 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ
                 logRestoreOffsets(restorer.partition(),
                                   restorer.checkpoint(),
                                   endOffsets.get(restorer.partition()));
-                restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
+                restorer.setStartingOffset(restoreConsumer.position(restorer.partition(), maxBlockMs, TimeUnit.MILLISECONDS));
                 restorer.restoreStarted();
             } else {
                 restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
@@ -184,7 +193,7 @@ private void startRestoration(final Map<TopicPartition, StateRestorer> initializ
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = restoreConsumer.position(restorer.partition());
+            final long position = restoreConsumer.position(restorer.partition(), maxBlockMs, TimeUnit.MILLISECONDS);
             logRestoreOffsets(restorer.partition(),
                               position,
                               endOffsets.get(restorer.partition()));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ab96cce4c2f..b8a72f42abd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -576,6 +576,12 @@ void removeAllSensors() {
     final Consumer<byte[], byte[]> consumer;
     final InternalTopologyBuilder builder;
 
+    /**
+     * @TODO Currently, the parameter commitTime is used as a mechanism by 
+     * which the user can input a set time to StoreChangeLogReader to block
+     * for position(). We might need to change the way by which timeout is passed
+     * to StoreChangeLogReader.
+     */
     public static StreamThread create(final InternalTopologyBuilder builder,
                                       final StreamsConfig config,
                                       final KafkaClientSupplier clientSupplier,
@@ -584,6 +590,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
                                       final String clientId,
                                       final Metrics metrics,
                                       final Time time,
+                                      final long commitTime,
                                       final StreamsMetadataState streamsMetadataState,
                                       final long cacheSizeBytes,
                                       final StateDirectory stateDirectory,
@@ -597,7 +604,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
         log.info("Creating restore consumer client");
         final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
         final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext, commitTime);
 
         Producer<byte[], byte[]> threadProducer = null;
         final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 9dfb6dda37e..22d3d6d27c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -166,7 +166,7 @@ public void setup() {
                .groupBy(MockMapper.selectKeyKeyValueMapper())
                .count();
 
-        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, time);
+        kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(streamsConfiguration), 20000);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 19ddedfdc94..3645a85a681 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -314,7 +314,7 @@ private void createStateForRestoration()
 
         final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition partition : partitions) {
-            final long position = consumer.position(partition);
+            final long position = consumer.position(partition, 2000L, TimeUnit.MILLISECONDS);
             offsets.put(partition, new OffsetAndMetadata(position + 1));
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 347e9c4fd75..1ff35e07a22 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -233,7 +233,7 @@ private AbstractTask createTask(final Consumer consumer,
                                 storeTopicPartitions,
                                 ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics),
                                 consumer,
-                                new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
+                                new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test "), 20000L),
                                 false,
                                 stateDirectory,
                                 config) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 605ab337983..77791f0f098 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -123,7 +123,7 @@ private StreamsConfig createConfig(final File baseDir) throws IOException {
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test "));
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test "), 20000L);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c65d4efadb1..4bc24ebdeb8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -72,7 +72,7 @@
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
     private final TopicPartition topicPartition = new TopicPartition("topic", 0);
     private final LogContext logContext = new LogContext("test-reader ");
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext, 20000L);
 
     @Before
     public void setUp() {
@@ -90,7 +90,7 @@ public void shouldRequestTopicsAndHandleTimeoutException() {
             }
         };
 
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext, 20000);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a30582905de..55ad8971f04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -119,7 +119,7 @@ public void close() {
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test ")) {
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test "), 20000L) {
         @Override
         public Map<TopicPartition, Long> restoredOffsets() {
             return Collections.singletonMap(changelogPartition, offset);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b22d98ee41c..34f04596b57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -224,6 +224,7 @@ private StreamThread createStreamThread(final String clientId, final StreamsConf
                                    clientId,
                                    metrics,
                                    mockTime,
+                                   5000,
                                    streamsMetadataState,
                                    0,
                                    stateDirectory,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 85c282ca461..85be612fd08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -184,7 +184,7 @@ private StreamTask createStreamsTask(final String applicationId,
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task "), 20000L),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 82c39eeb1d3..a9e75c2dd7e 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -242,7 +242,8 @@ public ProcessorTopologyTestDriver(final StreamsConfig config,
                                   new StoreChangelogReader(
                                       createRestoreConsumer(topology.storeToChangelogTopic()),
                                       new MockStateRestoreListener(),
-                                      new LogContext("topology-test-driver ")),
+                                      new LogContext("topology-test-driver "),
+                                      20000L),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index abcc99d362f..a025e9062e0 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -294,7 +294,8 @@ public void onRestoreEnd(TopicPartition topicPartition, String storeName, long t
                 new StoreChangelogReader(
                     createRestoreConsumer(processorTopology.storeToChangelogTopic()),
                     stateRestoreListener,
-                    new LogContext("topology-test-driver ")),
+                    new LogContext("topology-test-driver "),
+                    20000L),
                 streamsConfig,
                 streamsMetrics,
                 stateDirectory,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add TimeoutException to KafkaConsumer#position()
> ------------------------------------------------
>
>                 Key: KAFKA-6608
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6608
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Richard Yu
>            Assignee: Richard Yu
>            Priority: Blocker
>              Labels: kip
>
> In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out that if a timeout was added to methods which commits offsets synchronously, a stricter control on time could be achieved.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)