You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/12 23:04:06 UTC

[kafka] branch 1.0 updated: KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new b106a45  KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
b106a45 is described below

commit b106a4521968cfe4a444b0e8171ab296500e8eb5
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 13 08:43:58 2018 -0700

    KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
    
    As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>
---
 .../streams/processor/internals/AssignedTasks.java |  9 +++--
 .../streams/processor/internals/StreamTask.java    | 41 ++++++++++++----------
 .../streams/processor/internals/StreamThread.java  |  4 +--
 .../streams/processor/internals/TaskManager.java   | 12 +------
 .../processor/internals/AssignedTasksTest.java     |  3 +-
 .../processor/internals/StreamTaskTest.java        |  7 ++++
 6 files changed, 40 insertions(+), 36 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6aa9c03..0d9d04d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -110,6 +110,7 @@ class AssignedTasks implements RestoringTasks {
      * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     Set<TopicPartition> initializeNewTasks() {
         final Set<TopicPartition> readyPartitions = new HashSet<>();
@@ -259,14 +260,15 @@ class AssignedTasks implements RestoringTasks {
             log.trace("found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
+                task.resume();
                 try {
-                    task.resume();
+                    transitionToRunning(task, new HashSet<TopicPartition>());
                 } catch (final TaskMigratedException e) {
                     closeZombieTask(task);
                     suspended.remove(taskId);
+                    running.remove(task.id());
                     throw e;
                 }
-                transitionToRunning(task, new HashSet<TopicPartition>());
                 log.trace("resuming suspended {} {}", taskTypeName, task.id());
                 return true;
             } else {
@@ -286,6 +288,9 @@ class AssignedTasks implements RestoringTasks {
         }
     }
 
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c3f015d..df17f29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
      * @param producer              the instance of {@link Producer} used to produce records
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public StreamTask(final TaskId id,
                       final String applicationId,
@@ -146,14 +145,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         this.time = time;
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+        // initialize transactions if eos is turned on, which will block if the previous transaction has not
+        // completed yet; do not start the first transaction until the topology has been initialized later
         if (eosEnabled) {
-            try {
-                this.producer.initTransactions();
-                this.producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
+            this.producer.initTransactions();
         }
     }
 
@@ -164,31 +160,38 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         return changelogPartitions().isEmpty();
     }
 
+    /**
+     * <pre>
+     * - (re-)initialize the topology of the task
+     * </pre>
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     */
     @Override
     public void initializeTopology() {
         initTopology();
+
+        if (eosEnabled) {
+            try {
+                this.producer.beginTransaction();
+            } catch (final ProducerFencedException fatal) {
+                throw new TaskMigratedException(this, fatal);
+            }
+            transactionInFlight = true;
+        }
+
         processorContext.initialized();
         taskInitialized = true;
     }
 
     /**
      * <pre>
-     * - re-initialize the task
-     * - if (eos) begin new transaction
+     * - resume the task
      * </pre>
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     @Override
     public void resume() {
+        // nothing to do; new transaction will be started only after topology is initialized
         log.debug("Resuming");
-        if (eosEnabled) {
-            try {
-                producer.beginTransaction();
-            } catch (final ProducerFencedException fatal) {
-                throw new TaskMigratedException(this, fatal);
-            }
-            transactionInFlight = true;
-        }
         initTopology();
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eecf80d..c3f9312 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -344,6 +344,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
          */
         Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
             final List<Task> createdTasks = new ArrayList<>();
+
             for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
                 final TaskId taskId = newTaskAndPartitions.getKey();
                 final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
@@ -394,9 +395,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             this.threadClientId = threadClientId;
         }
 
-        /**
-         * @throws TaskMigratedException if the task producer got fenced (EOS only)
-         */
         @Override
         StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
             taskCreatedSensor.record();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 385729f..70f57c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -68,9 +68,6 @@ class TaskManager {
         this.log = logContext.logger(getClass());
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     void createTasks(final Collection<TopicPartition> assignment) {
         if (threadMetadataProvider == null) {
             throw new IllegalStateException(logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
@@ -96,9 +93,6 @@ class TaskManager {
         this.threadMetadataProvider = threadMetadataProvider;
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
         Map<TaskId, Set<TopicPartition>> assignedTasks = threadMetadataProvider.activeTasks();
         if (assignedTasks.isEmpty()) {
@@ -139,9 +133,6 @@ class TaskManager {
         }
     }
 
-    /**
-     * @throws TaskMigratedException if the task producer got fenced (EOS only)
-     */
     private void addStandbyTasks() {
         final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = threadMetadataProvider.standbyTasks();
         if (assignedStandbyTasks.isEmpty()) {
@@ -156,7 +147,6 @@ class TaskManager {
             if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
                 newStandbyTasks.put(taskId, partitions);
             }
-
         }
 
         if (newStandbyTasks.isEmpty()) {
@@ -254,7 +244,7 @@ class TaskManager {
     /**
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
-     * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
+     * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
         final Set<TopicPartition> resumed = active.initializeNewTasks();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 81a2ca3..5c8b7c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -256,9 +256,10 @@ public class AssignedTasksTest {
     }
 
     @Test
-    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+    public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
         mockRunningTaskSuspension();
         t1.resume();
+        t1.initializeTopology();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
         t1.close(false, true);
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3321da5..26b4e9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -898,6 +898,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         assertTrue(producer.transactionInitialized());
         assertTrue(producer.transactionInFlight());
@@ -928,6 +929,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -944,6 +946,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.suspend();
         assertTrue(producer.transactionCommitted());
@@ -971,6 +974,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -978,6 +982,7 @@ public class StreamTaskTest {
         task.suspend();
 
         task.resume();
+        task.initializeTopology();
         assertTrue(producer.transactionInFlight());
     }
 
@@ -1001,6 +1006,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -1029,6 +1035,7 @@ public class StreamTaskTest {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.close(false, false);
         task = null;

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.