You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/27 08:29:37 UTC

[kafka] branch trunk updated: KAFKA-6534: Enforce a rebalance in the next poll call when encounter task migration (#4544)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 97ad549  KAFKA-6534: Enforce a rebalance in the next poll call when encounter task migration (#4544)
97ad549 is described below

commit 97ad549d56d3432b0c70e72d402ebec1d07f8273
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Feb 27 00:29:25 2018 -0800

    KAFKA-6534: Enforce a rebalance in the next poll call when encounter task migration (#4544)
    
    The fix is in two folds:
    
    For tasks that's closed in closeZombieTask, their corresponding partitions are still in runningByPartition so those closed tasks may still be returned in activeTasks and standbyTasks. Adding guards on the returned tasks and if they are closed notify the thread to trigger rebalance immediately.
    
    When triggering a rebalance, un-subscribe and re-subscribe immediately to make sure we are not dependent on the background heartbeat thread timing.
    
    Some minor changes on log4j. More specifically, I moved the log entry of closeZombieTask to its callers with more context information and the action going to take.
    
    I can re-produce the issue with EosIntegrationTest may hand-code the heartbeat thread to GC, and confirmed this patch fixed the issue. Unfortunately this test cannot be added to AK since currently we do not have ways to manipulate the heartbeat thread in unit tests.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    |  8 ++---
 .../streams/processor/internals/AbstractTask.java  |  4 +++
 .../processor/internals/AssignedStreamsTasks.java  |  4 +++
 .../streams/processor/internals/AssignedTasks.java | 14 +++++----
 .../streams/processor/internals/StandbyTask.java   |  2 ++
 .../streams/processor/internals/StreamTask.java    |  2 ++
 .../streams/processor/internals/StreamThread.java  | 34 +++++++++++++++++++++-
 .../streams/processor/internals/TaskManager.java   |  1 -
 .../streams/integration/EosIntegrationTest.java    | 16 +++++-----
 9 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6884ff0..b39f52c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -767,20 +767,20 @@ public abstract class AbstractCoordinator implements Closeable {
                 future.complete(null);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
-                log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
+                log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid.",
                         coordinator());
                 coordinatorDead();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                log.debug("Attempt to heartbeat failed since group is rebalancing");
+                log.info("Attempt to heartbeat failed since group is rebalancing");
                 requestRejoin();
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
             } else if (error == Errors.ILLEGAL_GENERATION) {
-                log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
+                log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
                 resetGeneration();
                 future.raise(Errors.ILLEGAL_GENERATION);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
+                log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
                 resetGeneration();
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d9c827f..a8f7e65 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,6 +52,7 @@ public abstract class AbstractTask implements Task {
     final Logger log;
     final LogContext logContext;
     boolean taskInitialized;
+    boolean taskClosed;
     final StateDirectory stateDirectory;
 
     InternalProcessorContext processorContext;
@@ -256,6 +257,9 @@ public abstract class AbstractTask implements Task {
         }
     }
 
+    public boolean isClosed() {
+        return taskClosed;
+    }
 
     public boolean hasStateStores() {
         return !topology.stateStores().isEmpty();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 7b05f64..f98e635 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -95,6 +95,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
+                log.info("Failed to process stream task {} since it got migrated to another thread already. " +
+                        "Closing it as zombie before triggering a new rebalance.", task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
@@ -125,6 +127,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
                     punctuated++;
                 }
             } catch (final TaskMigratedException e) {
+                log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " +
+                        "Closing it as zombie before triggering a new rebalance.", task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 2cd82f4..8529c9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -200,6 +200,8 @@ abstract class AssignedTasks<T extends Task> {
                 suspended.put(task.id(), task);
             } catch (final TaskMigratedException closeAsZombieAndSwallow) {
                 // as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
+                log.info("Failed to suspend {} {} since it got migrated to another thread already. " +
+                        "Closing it as zombie and move on.", taskTypeName, task.id());
                 firstException.compareAndSet(null, closeZombieTask(task));
                 it.remove();
             } catch (final RuntimeException e) {
@@ -216,7 +218,6 @@ abstract class AssignedTasks<T extends Task> {
     }
 
     RuntimeException closeZombieTask(final T task) {
-        log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
         try {
             task.close(false, true);
         } catch (final RuntimeException e) {
@@ -242,11 +243,12 @@ abstract class AssignedTasks<T extends Task> {
                 try {
                     task.resume();
                 } catch (final TaskMigratedException e) {
+                    log.info("Failed to resume {} {} since it got migrated to another thread already. " +
+                            "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id());
                     final RuntimeException fatalException = closeZombieTask(task);
                     if (fatalException != null) {
                         throw fatalException;
                     }
-                    suspended.remove(taskId);
                     throw e;
                 }
                 transitionToRunning(task, new HashSet<TopicPartition>());
@@ -368,14 +370,14 @@ abstract class AssignedTasks<T extends Task> {
             try {
                 action.apply(task);
             } catch (final TaskMigratedException e) {
+                log.info("Failed to commit {} {} since it got migrated to another thread already. " +
+                        "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
                 }
                 it.remove();
-                if (firstException == null) {
-                    firstException = e;
-                }
+                throw e;
             } catch (final RuntimeException t) {
                 log.error("Failed to {} {} {} due to the following error:",
                           action.name(),
@@ -416,6 +418,8 @@ abstract class AssignedTasks<T extends Task> {
             try {
                 task.close(clean, false);
             } catch (final TaskMigratedException e) {
+                log.info("Failed to close {} {} since it got migrated to another thread already. " +
+                        "Closing it as zombie and move on.", taskTypeName, task.id());
                 firstException.compareAndSet(null, closeZombieTask(task));
             } catch (final RuntimeException t) {
                 log.error("Failed while closing {} {} due to the following error:",
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 39d34d7..861556c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -144,6 +144,8 @@ public class StandbyTask extends AbstractTask {
         } finally {
             closeStateManager(committedSuccessfully);
         }
+
+        taskClosed = true;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6bca02a..b8777ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -541,6 +541,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
 
         closeSuspended(clean, isZombie, firstException);
+
+        taskClosed = true;
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 61a22be..cda04e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -757,7 +757,12 @@ public class StreamThread extends Thread {
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
                 log.warn("Detected task {} that got migrated to another thread. " +
                     "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
-                    "Trying to rejoin the consumer group now. Below is the detailed description of the task:\n{}", ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
+                    "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}",
+                        ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
+
+                // re-subscribe to enforce a rebalance in the next poll call
+                consumer.unsubscribe();
+                consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
             }
         }
     }
@@ -898,6 +903,13 @@ public class StreamThread extends Thread {
 
         for (final TopicPartition partition : records.partitions()) {
             final StreamTask task = taskManager.activeTask(partition);
+
+            if (task.isClosed()) {
+                log.warn("Stream task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
+                        "Notifying the thread to trigger a new rebalance immediately.", task.id());
+                throw new TaskMigratedException(task);
+            }
+
             numAddedRecords += task.addRecords(partition, records.records(partition));
         }
         streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
@@ -1024,6 +1036,13 @@ public class StreamThread extends Thread {
                         List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
                         if (remaining != null) {
                             final StandbyTask task = taskManager.standbyTask(partition);
+
+                            if (task.isClosed()) {
+                                log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
+                                        "Notifying the thread to trigger a new rebalance immediately.", task.id());
+                                throw new TaskMigratedException(task);
+                            }
+
                             remaining = task.update(partition, remaining);
                             if (remaining != null) {
                                 remainingStandbyRecords.put(partition, remaining);
@@ -1051,6 +1070,12 @@ public class StreamThread extends Thread {
                             throw new StreamsException(logPrefix + "Missing standby task for partition " + partition);
                         }
 
+                        if (task.isClosed()) {
+                            log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
+                                    "Notifying the thread to trigger a new rebalance immediately.", task.id());
+                            throw new TaskMigratedException(task);
+                        }
+
                         final List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
                         if (remaining != null) {
                             restoreConsumer.pause(singleton(partition));
@@ -1063,6 +1088,13 @@ public class StreamThread extends Thread {
                 final Set<TopicPartition> partitions = recoverableException.partitions();
                 for (final TopicPartition partition : partitions) {
                     final StandbyTask task = taskManager.standbyTask(partition);
+
+                    if (task.isClosed()) {
+                        log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
+                                "Notifying the thread to trigger a new rebalance immediately.", task.id());
+                        throw new TaskMigratedException(task);
+                    }
+
                     log.info("Reinitializing StandbyTask {}", task);
                     task.reinitializeStateStoresForPartitions(recoverableException.partitions());
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 62ddacf..9f02834 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -300,7 +300,6 @@ class TaskManager {
         return active.runningTaskFor(partition);
     }
 
-
     StandbyTask standbyTask(final TopicPartition partition) {
         return standby.runningTaskFor(partition);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 6c7b2b4..c4ea964 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -78,18 +79,18 @@ public class EosIntegrationTest {
     });
 
     private static String applicationId;
+    private final static int NUM_TOPIC_PARTITIONS = 2;
     private final static String CONSUMER_GROUP_ID = "readCommitted";
     private final static String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
     private final static String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
     private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
-    private final static int NUM_TOPIC_PARTITIONS = 2;
     private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
     private final static String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
     private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
     private final String storeName = "store";
 
     private AtomicBoolean errorInjected;
-    private AtomicBoolean injectGC;
+    private AtomicBoolean gcInjected;
     private volatile boolean doGC = true;
     private AtomicInteger commitRequested;
     private Throwable uncaughtException;
@@ -153,7 +154,6 @@ public class EosIntegrationTest {
         output.to(outputTopic);
 
         for (int i = 0; i < numberOfRestarts; ++i) {
-            final long factor = i;
             final KafkaStreams streams = new KafkaStreams(
                 builder.build(),
                 StreamsTestUtils.getStreamsConfig(
@@ -171,7 +171,7 @@ public class EosIntegrationTest {
             try {
                 streams.start();
 
-                final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);
+                final List<KeyValue<Long, Long>> inputData = prepareData(i * 100, i * 100 + 10L, 0L, 1L);
 
                 IntegrationTestUtils.produceKeyValuesSynchronously(
                     inputTopic,
@@ -510,7 +510,7 @@ public class EosIntegrationTest {
             checkResultPerKey(committedRecords, committedDataBeforeGC);
             checkResultPerKey(uncommittedRecords, dataBeforeGC);
 
-            injectGC.set(true);
+            gcInjected.set(true);
             writeInputData(dataToTriggerFirstRebalance);
 
             TestUtils.waitForCondition(new TestCondition() {
@@ -577,7 +577,7 @@ public class EosIntegrationTest {
     private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final int numberOfStreamsThreads) {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
-        injectGC = new AtomicBoolean(false);
+        gcInjected = new AtomicBoolean(false);
         final StreamsBuilder builder = new StreamsBuilder();
 
         String[] storeNames = null;
@@ -614,7 +614,7 @@ public class EosIntegrationTest {
                             // only tries to fail once on one of the task
                             throw new RuntimeException("Injected test exception.");
                         }
-                        if (injectGC.compareAndSet(true, false)) {
+                        if (gcInjected.compareAndSet(true, false)) {
                             while (doGC) {
                                 try {
                                     Thread.sleep(100);
@@ -779,6 +779,8 @@ public class EosIntegrationTest {
             }
         }
 
+        assertNotNull(store);
+
         final KeyValueIterator<Long, Long> it = store.all();
         while (it.hasNext()) {
             assertTrue(expectedStoreContent.remove(it.next()));

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.