You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/20 21:48:07 UTC

[kafka] branch 1.0 updated: KAFKA-6366: Fix stack overflow in consumer due to many offset commits during coordinator disconnect (#4349)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 53964b0  KAFKA-6366: Fix stack overflow in consumer due to many offset commits during coordinator disconnect (#4349)
53964b0 is described below

commit 53964b05c18d918955cf1751d84dc92ba1d44f27
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sat Jan 20 13:28:24 2018 -0800

    KAFKA-6366: Fix stack overflow in consumer due to many offset commits during coordinator disconnect (#4349)
    
    When the coordinator is marked unknown, we explicitly disconnect its connection and cancel pending requests. Currently the disconnect happens before the coordinator state is set to null, which means that callbacks which inspect the coordinator state will see it still as active. If there are offset commit requests which need to be cancelled, each request callback will inspect the coordinator state and attempt to mark the coordinator dead again. In the worst case, if there are many pend [...]
    
    I have added a test case which reproduced the stack overflow with many pending offset commits. I have also added a basic test case to verify that callbacks for in-flight or unsent requests see the coordinator as unknown which prevents them from attempting to resend.
    
    This patch also includes some minor cleanups which were noticed along the way.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../consumer/RetriableCommitFailedException.java   |  6 ++-
 .../consumer/internals/AbstractCoordinator.java    | 49 ++++++++++--------
 .../consumer/internals/ConsumerCoordinator.java    |  9 ++--
 .../consumer/internals/ConsumerNetworkClient.java  |  3 +-
 .../internals/ConsumerCoordinatorTest.java         | 58 ++++++++++++++++++++++
 5 files changed, 98 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
index 69f21a4..e719e7c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -24,11 +24,13 @@ public class RetriableCommitFailedException extends RetriableException {
 
     public static RetriableCommitFailedException withUnderlyingMessage(String additionalMessage) {
         return new RetriableCommitFailedException("Offset commit failed with a retriable exception. " +
-                "You should retry committing offsets. The underlying error was: " + additionalMessage);
+                "You should retry committing the latest consumed offsets. " +
+                "The underlying error was: " + additionalMessage);
     }
 
     public RetriableCommitFailedException(Throwable t) {
-        super("Offset commit failed with a retriable exception. You should retry committing offsets.", t);
+        super("Offset commit failed with a retriable exception. You should retry committing " +
+                "the latest consumed offsets.", t);
     }
 
     public RetriableCommitFailedException(String message) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 86e4f2b..cd5c382 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -248,10 +248,10 @@ public abstract class AbstractCoordinator implements Closeable {
             // find a node to ask about the coordinator
             Node node = this.client.leastLoadedNode();
             if (node == null) {
-                log.debug("No broker available to send GroupCoordinator request");
+                log.debug("No broker available to send FindCoordinator request");
                 return RequestFuture.noBrokersAvailable();
             } else
-                findCoordinatorFuture = sendGroupCoordinatorRequest(node);
+                findCoordinatorFuture = sendFindCoordinatorRequest(node);
         }
         return findCoordinatorFuture;
     }
@@ -574,34 +574,35 @@ public abstract class AbstractCoordinator implements Closeable {
      * one of the brokers. The returned future should be polled to get the result of the request.
      * @return A request future which indicates the completion of the metadata request
      */
-    private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
+    private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending GroupCoordinator request to broker {}", node);
+        log.debug("Sending FindCoordinator request to broker {}", node);
         FindCoordinatorRequest.Builder requestBuilder =
                 new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
         return client.send(node, requestBuilder)
-                     .compose(new GroupCoordinatorResponseHandler());
+                     .compose(new FindCoordinatorResponseHandler());
     }
 
-    private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
+    private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
-            log.debug("Received GroupCoordinator response {}", resp);
+            log.debug("Received FindCoordinator response {}", resp);
+            clearFindCoordinatorFuture();
 
             FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
-            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
-            // for the coordinator in the underlying network client layer
-            // TODO: this needs to be better handled in KAFKA-1935
             Errors error = findCoordinatorResponse.error();
-            clearFindCoordinatorFuture();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
+                    // use MAX_VALUE - node.id as the coordinator id to allow separate connections
+                    // for the coordinator in the underlying network client layer
+                    int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
+
                     AbstractCoordinator.this.coordinator = new Node(
-                            Integer.MAX_VALUE - findCoordinatorResponse.node().id(),
+                            coordinatorConnectionId,
                             findCoordinatorResponse.node().host(),
                             findCoordinatorResponse.node().port());
-                    log.info("Discovered coordinator {}", coordinator);
+                    log.info("Discovered group coordinator {}", coordinator);
                     client.tryConnect(coordinator);
                     heartbeat.resetTimeouts(time.milliseconds());
                 }
@@ -647,11 +648,16 @@ public abstract class AbstractCoordinator implements Closeable {
     protected synchronized void coordinatorDead() {
         if (this.coordinator != null) {
             log.info("Marking the coordinator {} dead", this.coordinator);
+            Node oldCoordinator = this.coordinator;
+
+            // Mark the coordinator dead before disconnecting requests since the callbacks for any pending
+            // requests may attempt to do likewise. This also prevents new requests from being sent to the
+            // coordinator while the disconnect is in progress.
+            this.coordinator = null;
 
             // Disconnect from the coordinator to ensure that there are no in-flight requests remaining.
             // Pending callbacks will be invoked with a DisconnectException.
-            client.disconnect(this.coordinator);
-            this.coordinator = null;
+            client.disconnect(oldCoordinator);
         }
     }
 
@@ -987,15 +993,18 @@ public abstract class AbstractCoordinator implements Closeable {
                 log.error("An authentication error occurred in the heartbeat thread", e);
                 this.failed.set(e);
             } catch (GroupAuthorizationException e) {
-                log.error("A group authorization error occurred in the heartbeat thread for group {}", groupId, e);
+                log.error("A group authorization error occurred in the heartbeat thread", e);
                 this.failed.set(e);
             } catch (InterruptedException | InterruptException e) {
                 Thread.interrupted();
-                log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
+                log.error("Unexpected interrupt received in heartbeat thread", e);
                 this.failed.set(new RuntimeException(e));
-            } catch (RuntimeException e) {
-                log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e);
-                this.failed.set(e);
+            } catch (Throwable e) {
+                log.error("Heartbeat thread failed due to unexpected error", e);
+                if (e instanceof RuntimeException)
+                    this.failed.set((RuntimeException) e);
+                else
+                    this.failed.set(new RuntimeException(e));
             } finally {
                 log.debug("Heartbeat thread has closed");
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5482db7..eebc5d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -295,6 +295,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 ensureActiveGroup();
                 now = time.milliseconds();
             }
+
+            pollHeartbeat(now);
         } else {
             // For manually assigned partitions, if there are no ready nodes, await metadata.
             // If connections to all nodes fail, wakeups triggered while attempting to send fetch
@@ -311,12 +313,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             }
         }
 
-        pollHeartbeat(now);
         maybeAutoCommitOffsetsAsync(now);
     }
 
     /**
-     * Return the time to the next needed invocation of {@link #poll(long)}.
+     * Return the time to the next needed invocation of {@link #poll(long, long)}.
      * @param now current time in milliseconds
      * @return the maximum time in milliseconds the caller should wait before the next invocation of poll()
      */
@@ -533,7 +534,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 public void onFailure(RuntimeException e) {
                     pendingAsyncCommits.decrementAndGet();
                     completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
-                            RetriableCommitFailedException.withUnderlyingMessage(e.getMessage())));
+                            new RetriableCommitFailedException(e)));
                 }
             });
         }
@@ -562,7 +563,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 Exception commitException = e;
 
                 if (e instanceof RetriableException)
-                    commitException = RetriableCommitFailedException.withUnderlyingMessage(e.getMessage());
+                    commitException = new RetriableCommitFailedException(e);
 
                 completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 2f9cb1b..9d621b9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -384,8 +384,9 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     public void disconnect(Node node) {
+        failUnsentRequests(node, DisconnectException.INSTANCE);
+
         synchronized (this) {
-            failUnsentRequests(node, DisconnectException.INSTANCE);
             client.disconnect(node.idString());
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 65de8c0..2439e82 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -74,6 +75,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
@@ -197,6 +199,62 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() throws Exception {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+
+        int numRequests = 1000;
+        TopicPartition tp = new TopicPartition("foo", 0);
+        final AtomicInteger responses = new AtomicInteger(0);
+
+        for (int i = 0; i < numRequests; i++) {
+            Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(tp, new OffsetAndMetadata(i));
+            coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() {
+                @Override
+                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+                    responses.incrementAndGet();
+                    Throwable cause = exception.getCause();
+                    assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()),
+                            cause instanceof DisconnectException);
+                }
+            });
+        }
+
+        coordinator.coordinatorDead();
+        coordinator.invokeCompletedOffsetCommitCallbacks();
+        assertEquals(numRequests, responses.get());
+    }
+
+    @Test
+    public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() throws Exception {
+        // When the coordinator is marked dead, all unsent or in-flight requests are cancelled
+        // with a disconnect error. This test case ensures that the corresponding callbacks see
+        // the coordinator as unknown which prevents additional retries to the same coordinator.
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+
+        final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
+        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = Collections.singletonMap(
+                new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L, ""));
+        consumerClient.send(coordinator.coordinator(), new OffsetCommitRequest.Builder(groupId, offsets))
+                .compose(new RequestFutureAdapter<ClientResponse, Object>() {
+                    @Override
+                    public void onSuccess(ClientResponse value, RequestFuture<Object> future) {}
+
+                    @Override
+                    public void onFailure(RuntimeException e, RequestFuture<Object> future) {
+                        assertTrue("Unexpected exception type: " + e.getClass(), e instanceof DisconnectException);
+                        assertTrue(coordinator.coordinatorUnknown());
+                        asyncCallbackInvoked.set(true);
+                    }
+                });
+
+        coordinator.coordinatorDead();
+        assertTrue(asyncCallbackInvoked.get());
+    }
+
+    @Test
     public void testNotCoordinator() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].