You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/30 02:25:00 UTC

[jira] [Commented] (KAFKA-6783) consumer poll(timeout) blocked infinitely when no available bootstrap server

    [ https://issues.apache.org/jira/browse/KAFKA-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494593#comment-16494593 ] 

ASF GitHub Bot commented on KAFKA-6783:
---------------------------------------

koqizhao closed pull request #4861: KAFKA-6783: consumer poll(timeout) blocked infinitely
URL: https://github.com/apache/kafka/pull/4861
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index dd4bb7038f3..82b6f69868f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -308,12 +308,15 @@ protected synchronized long timeToNextHeartbeat(long now) {
     /**
      * Ensure that the group is active (i.e. joined and synced)
      */
-    public void ensureActiveGroup() {
+    public void ensureActiveGroup(long now, long remainingMs) {
         // always ensure that the coordinator is ready because we may have been disconnected
         // when sending heartbeats and does not necessarily require us to rejoin the group.
-        ensureCoordinatorReady();
+        ensureCoordinatorReady(now, remainingMs);
         startHeartbeatThreadIfNeeded();
-        joinGroupIfNeeded();
+
+        remainingMs = Math.max(0, remainingMs - (time.milliseconds() - now));
+        now = time.milliseconds();
+        joinGroupIfNeeded(now, remainingMs);
     }
 
     private synchronized void startHeartbeatThreadIfNeeded() {
@@ -346,9 +349,9 @@ private void closeHeartbeatThread() {
     }
 
     // visible for testing. Joins the group without starting the heartbeat thread.
-    void joinGroupIfNeeded() {
+    void joinGroupIfNeeded(long now, long remainingMs) {
         while (needRejoin() || rejoinIncomplete()) {
-            ensureCoordinatorReady();
+            ensureCoordinatorReady(now, remainingMs);
 
             // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
             // time if the client is woken up before a pending rebalance completes. This must be called
@@ -375,12 +378,18 @@ void joinGroupIfNeeded() {
                 RuntimeException exception = future.exception();
                 if (exception instanceof UnknownMemberIdException ||
                         exception instanceof RebalanceInProgressException ||
-                        exception instanceof IllegalGenerationException)
-                    continue;
-                else if (!future.isRetriable())
+                        exception instanceof IllegalGenerationException) {
+
+                } else if (!future.isRetriable())
                     throw exception;
-                time.sleep(retryBackoffMs);
+                else
+                    time.sleep(retryBackoffMs);
             }
+
+            remainingMs = Math.max(0, remainingMs - (time.milliseconds() - now));
+            if (remainingMs == 0)
+                break;
+            now = time.milliseconds();
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c966d54..fcd26010061 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -276,7 +276,9 @@ public void poll(long now, long remainingMs) {
 
         if (subscriptions.partitionsAutoAssigned()) {
             if (coordinatorUnknown()) {
-                ensureCoordinatorReady();
+                ensureCoordinatorReady(now, remainingMs);
+
+                remainingMs = Math.max(0, remainingMs - (time.milliseconds() - now));
                 now = time.milliseconds();
             }
 
@@ -287,7 +289,7 @@ public void poll(long now, long remainingMs) {
                 if (subscriptions.hasPatternSubscription())
                     client.ensureFreshMetadata();
 
-                ensureActiveGroup();
+                ensureActiveGroup(now, remainingMs);
                 now = time.milliseconds();
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8c147a58f77..52d1579a51f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1191,6 +1191,37 @@ public void testPollWithEmptyUserAssignment() {
         }
     }
 
+    @Test
+    public void testPollWithAllBootstrapServersDown() throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            final long pollTimeout = 1000;
+            final AtomicBoolean pollComplete = new AtomicBoolean();
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    Properties props = new Properties();
+                    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
+                    try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
+                        consumer.subscribe(Arrays.asList(topic));
+                        try {
+                            consumer.poll(pollTimeout);
+                        } catch (Exception ex) {
+                            ex.printStackTrace();
+                        } finally {
+                            pollComplete.set(true);
+                        }
+                    }
+                }
+            });
+
+            Thread.sleep(pollTimeout * 2);
+            Assert.assertTrue("poll timeout not work when all servers down", pollComplete.get());
+        } finally {
+            executor.shutdown();
+        }
+    }
+
     @Test
     public void testGracefulClose() throws Exception {
         Map<TopicPartition, Errors> response = new HashMap<>();
@@ -1306,7 +1337,7 @@ public boolean matches(final AbstractRequest body) {
         }, fetchResponse(tp0, 1, 1), node);
         time.sleep(heartbeatIntervalMs);
         Thread.sleep(heartbeatIntervalMs);
-        final ConsumerRecords<String, String> records = consumer.poll(0);
+        final ConsumerRecords<String, String> records = consumer.poll(200);
         assertFalse(records.isEmpty());
         consumer.close(0, TimeUnit.MILLISECONDS);
     }
@@ -1336,7 +1367,7 @@ private void consumerCloseTest(final long closeTimeoutMs,
         // Poll with responses
         client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);
         client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node);
-        consumer.poll(0);
+        consumer.poll(200);
 
         // Initiate close() after a commit request on another thread.
         // Kafka consumer is single-threaded, but the implementation allows calls on a
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 1c88803e26c..2fecfe5ea6f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -129,7 +129,7 @@ public boolean matches(AbstractRequest body) {
         }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR));
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             mockTime.sleep(HEARTBEAT_INTERVAL_MS);
             long startMs = System.currentTimeMillis();
             while (System.currentTimeMillis() - startMs < 1000) {
@@ -150,7 +150,7 @@ public void testPollHeartbeatAwakesHeartbeatThread() throws Exception {
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         final CountDownLatch heartbeatDone = new CountDownLatch(1);
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -208,7 +208,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -217,7 +217,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -246,7 +246,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -257,7 +257,7 @@ public boolean matches(AbstractRequest body) {
 
         // the join group completes in this poll()
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -284,7 +284,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -293,7 +293,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -320,7 +320,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -331,7 +331,7 @@ public boolean matches(AbstractRequest body) {
 
         // the join group completes in this poll()
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -360,7 +360,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -369,7 +369,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -398,7 +398,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -409,7 +409,7 @@ public boolean matches(AbstractRequest body) {
 
         // the join group completes in this poll()
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -436,7 +436,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -445,7 +445,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -472,7 +472,7 @@ public boolean matches(AbstractRequest body) {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -481,7 +481,7 @@ public boolean matches(AbstractRequest body) {
         assertEquals(0, coordinator.onJoinCompleteInvokes);
         assertFalse(heartbeatReceived.get());
 
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
@@ -500,7 +500,7 @@ public void testWakeupInOnJoinComplete() throws Exception {
         AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Should have woken up from ensureActiveGroup()");
         } catch (WakeupException e) {
         }
@@ -512,7 +512,7 @@ public void testWakeupInOnJoinComplete() throws Exception {
         // the join group completes in this poll()
         coordinator.wakeupOnJoinComplete = false;
         consumerClient.poll(0);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3e3c423a428..83912bdfe4b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -566,7 +566,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -605,7 +605,7 @@ public boolean matches(AbstractRequest body) {
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(2, subscriptions.assignedPartitions().size());
@@ -672,7 +672,7 @@ public void testUnexpectedErrorOnSyncGroup() {
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_SERVER_ERROR));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
     }
 
     @Test
@@ -698,7 +698,7 @@ public boolean matches(AbstractRequest body) {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -721,7 +721,7 @@ public void testRebalanceInProgressOnSyncGroup() {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -750,7 +750,7 @@ public boolean matches(AbstractRequest body) {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -937,7 +937,7 @@ public void testRejoinGroup() {
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(singleton(t1p), rebalanceListener.revoked);
@@ -957,7 +957,7 @@ public void testDisconnectInJoin() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -975,7 +975,7 @@ public void testInvalidSessionTimeout() {
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
     }
 
     @Test
@@ -1132,7 +1132,7 @@ public void testAutoCommitDynamicAssignmentRebalance() {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
 
         subscriptions.seek(t1p, 100);
 
@@ -1574,7 +1574,7 @@ public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure(
         client.authenticationFailed(node, 300);
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Expected an authentication error.");
         } catch (AuthenticationException e) {
             // OK
@@ -1584,7 +1584,7 @@ public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure(
         assertTrue(client.connectionFailed(node));
 
         try {
-            coordinator.ensureActiveGroup();
+            coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
             fail("Expected an authentication error.");
         } catch (AuthenticationException e) {
             // OK
@@ -1703,7 +1703,7 @@ public void testCloseNoWait() throws Exception {
     public void testHeartbeatThreadClose() throws Exception {
         groupId = "testCloseTimeoutWithHeartbeatThread";
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
         time.sleep(heartbeatIntervalMs + 100);
         Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
@@ -1744,7 +1744,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
             client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-            coordinator.joinGroupIfNeeded();
+            coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
         } else
             subscriptions.assignFromUser(singleton(t1p));
 
@@ -1911,7 +1911,7 @@ private void joinAsFollowerAndReceiveAssignment(String consumerId,
         coordinator.ensureCoordinatorReady();
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(assignment, Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        coordinator.joinGroupIfNeeded(0, Long.MAX_VALUE);
     }
 
     private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 60407c1d0dc..9de9d6d2170 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -107,7 +107,7 @@ public void poll(long timeout) {
         // poll for io until the timeout expires
         final long start = time.milliseconds();
         long now = start;
-        long remaining;
+        long remaining = timeout;
 
         do {
             if (coordinatorUnknown()) {
@@ -116,7 +116,7 @@ public void poll(long timeout) {
             }
 
             if (needRejoin()) {
-                ensureActiveGroup();
+                ensureActiveGroup(now, remaining);
                 now = time.milliseconds();
             }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 154b1df5e73..b65bcafbd2d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -225,7 +225,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(0, rebalanceListener.revokedCount);
@@ -262,7 +262,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
                 Collections.singletonList(taskId1x0), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(0, rebalanceListener.revokedCount);
@@ -307,7 +307,7 @@ public boolean matches(AbstractRequest body) {
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
         client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L,
                 Collections.<String>emptyList(), Collections.singletonList(taskId1x0), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         PowerMock.verifyAll();
     }
@@ -326,7 +326,7 @@ public void testRejoinGroup() {
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
                 Collections.singletonList(taskId1x0), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -340,7 +340,7 @@ public void testRejoinGroup() {
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
-        coordinator.ensureActiveGroup();
+        coordinator.ensureActiveGroup(0, Long.MAX_VALUE);
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(Collections.emptyList(), rebalanceListener.revokedConnectors);
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index b265182af23..b484f04b79a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -144,7 +144,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     val endTimeMs = System.currentTimeMillis + 10000
     var throttled = false
     while ((!throttled || quotaTestClients.exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) {
-      consumer.poll(100)
+      consumer.poll(2000)
       val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, consumerClientId)
       throttled = throttleMetric != null && metricValue(throttleMetric) > 0
     }
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index a06e9e36528..83e6863dc10 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -158,7 +158,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.subscribe(List(topic).asJava, listener)
 
     // poll once to get the initial assignment
-    consumer0.poll(0)
+    consumer0.poll(100)
     assertEquals(1, listener.callsToAssigned)
     assertEquals(1, listener.callsToRevoked)
 
@@ -201,11 +201,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.subscribe(List(topic).asJava, listener)
 
     // poll once to join the group and get the initial assignment
-    consumer0.poll(0)
+    consumer0.poll(10)
 
     // force a rebalance to trigger an invocation of the revocation callback while in the group
     consumer0.subscribe(List("otherTopic").asJava, listener)
-    consumer0.poll(0)
+    consumer0.poll(10)
 
     assertEquals(0, committedPosition)
     assertTrue(commitCompleted)
@@ -231,7 +231,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer0.subscribe(List(topic).asJava, listener)
 
     // poll once to join the group and get the initial assignment
-    consumer0.poll(0)
+    consumer0.poll(10)
 
     // we should still be in the group after this invocation
     consumer0.poll(0)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index bb62fb7fc67..886289d2d00 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -60,6 +60,8 @@ import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 import scala.collection.Seq
+import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
+import java.lang.reflect.Field
 
 object DynamicBrokerReconfigurationTest {
   val SecureInternal = "INTERNAL"
@@ -804,6 +806,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
       "Listeners not updated")
 
+    // The message has been got and cached sometime before the config changed, so we can still get it
+    assertEquals(1, consumer1.poll(0).count)
+
     // Test that connections using deleted listener don't work
     val producerFuture = verifyConnectionFailure(producer1)
     val consumerFuture = verifyConnectionFailure(consumer1)
@@ -1185,7 +1190,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     executors += executor
     val future = executor.submit(new Runnable() {
       def run() {
-        assertEquals(0, consumer.poll(100).count)
+        // since KAFKA-6783 fixed, poll(timeout) will always honor the timeout
+        // and no longer hang up when all bootstrap servers down
+        // so we should decide the connection failure in another way
+        val field = consumer.getClass().getDeclaredField("coordinator")
+        field.setAccessible(true);
+        val coordinator = field.get(consumer).asInstanceOf[ConsumerCoordinator]
+        coordinator.ensureCoordinatorReady() // hang up when all servers down
       }
     })
     verifyTimeout(future)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 1469d180d24..c90d735990b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -91,6 +91,7 @@ public void before() {
         streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsProp.put(StreamsConfig.POLL_MS_CONFIG, "5000");
     }
 
     @After


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> consumer poll(timeout) blocked infinitely when no available bootstrap server
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6783
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6783
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.1.0
>            Reporter: Qiang Zhao
>            Priority: Major
>              Labels: features
>             Fix For: 2.0.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> {code:java}
>     @Test
>     public void testPollWithAllBootstrapServersDown() throws Exception {
>         ExecutorService executor = Executors.newSingleThreadExecutor();
>         try {
>             final long pollTimeout = 1000;
>             final AtomicBoolean pollComplete = new AtomicBoolean();
>             executor.submit(new Runnable() {
>                 @Override
>                 public void run() {
>                     Properties props = new Properties();
>                     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
>                     try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
>                         consumer.subscribe(Arrays.asList(topic));
>                         try {
>                             consumer.poll(pollTimeout);
>                         } catch (Exception ex) {
>                             ex.printStackTrace();
>                         } finally {
>                             pollComplete.set(true);
>                         }
>                     }
>                 }
>             });
>             Thread.sleep(pollTimeout * 2);
>             Assert.assertTrue("poll timeout not work when all servers down", pollComplete.get());
>         } finally {
>             executor.shutdown();
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)