You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/06 19:27:29 UTC

kafka git commit: KAFKA-2713: Run task start and stop methods in worker threads so they execute in parallel and cannot block the herder thread.

Repository: kafka
Updated Branches:
  refs/heads/trunk b1eaa46a5 -> a76660ac8


KAFKA-2713: Run task start and stop methods in worker threads so they execute in parallel and cannot block the herder thread.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira

Closes #443 from ewencp/kafka-2713-task-start-stop-threaded


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a76660ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a76660ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a76660ac

Branch: refs/heads/trunk
Commit: a76660ac811eb6e2cdf36a110ed4afb5bf3a617a
Parents: b1eaa46
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Nov 6 10:27:18 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Nov 6 10:27:18 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/copycat/sink/SinkTask.java | 16 ++++++
 .../apache/kafka/copycat/source/SourceTask.java | 20 ++++++++
 .../kafka/copycat/runtime/WorkerSinkTask.java   | 52 ++++++++++++++------
 .../copycat/runtime/WorkerSinkTaskThread.java   |  5 ++
 .../kafka/copycat/runtime/WorkerSourceTask.java | 47 ++++++++++++++----
 .../copycat/runtime/WorkerSinkTaskTest.java     | 26 +++++++---
 .../copycat/runtime/WorkerSourceTaskTest.java   | 29 +++++++++++
 7 files changed, 161 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index c6cd12f..7c03cda 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -23,6 +23,7 @@ import org.apache.kafka.copycat.connector.Task;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
@@ -47,6 +48,13 @@ public abstract class SinkTask implements Task {
     }
 
     /**
+     * Start the Task. This should handle any configuration parsing and one-time setup of the task.
+     * @param props initial configuration
+     */
+    @Override
+    public abstract void start(Properties props);
+
+    /**
      * Put the records in the sink. Usually this should send the records to the sink asynchronously
      * and immediately return.
      *
@@ -84,4 +92,12 @@ public abstract class SinkTask implements Task {
      */
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
     }
+
+    /**
+     * Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other
+     * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset
+     * commit has completed. Implementations of this method should only need to perform final cleanup operations, such
+     * as closing network connections to the sink system.
+     */
+    public abstract void stop();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
index 1e1da34..30cbf16 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.copycat.connector.Task;
 
 import java.util.List;
+import java.util.Properties;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in Kafka.
@@ -38,6 +39,13 @@ public abstract class SourceTask implements Task {
     }
 
     /**
+     * Start the Task. This should handle any configuration parsing and one-time setup of the task.
+     * @param props initial configuration
+     */
+    @Override
+    public abstract void start(Properties props);
+
+    /**
      * Poll this SourceTask for new records. This method should block if no data is currently
      * available.
      *
@@ -59,4 +67,16 @@ public abstract class SourceTask implements Task {
     public void commit() throws InterruptedException {
         // This space intentionally left blank.
     }
+
+    /**
+     * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop
+     * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has
+     * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and
+     * {@link #commit()}.
+     *
+     * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method
+     * could set a flag that will force {@link #poll()} to exit immediately and invoke
+     * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests.
+     */
+    public abstract void stop();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 55a67c0..dc51730 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -60,8 +60,10 @@ class WorkerSinkTask implements WorkerTask {
     private final Converter keyConverter;
     private final Converter valueConverter;
     private WorkerSinkTaskThread workThread;
+    private Properties taskProps;
     private KafkaConsumer<byte[], byte[]> consumer;
     private WorkerSinkTaskContext context;
+    private boolean started;
     private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
 
     public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
@@ -72,26 +74,15 @@ class WorkerSinkTask implements WorkerTask {
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
         this.time = time;
+        this.started = false;
     }
 
     @Override
     public void start(Properties props) {
+        taskProps = props;
         consumer = createConsumer();
         context = new WorkerSinkTaskContext(consumer);
 
-        // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
-        // to work with. Any rewinding will be handled immediately when polling starts.
-        String topicsStr = props.getProperty(SinkTask.TOPICS_CONFIG);
-        if (topicsStr == null || topicsStr.isEmpty())
-            throw new CopycatException("Sink tasks require a list of topics.");
-        String[] topics = topicsStr.split(",");
-        log.debug("Task {} subscribing to topics {}", id, topics);
-        consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
-        consumer.poll(0);
-
-        task.initialize(context);
-        task.start(props);
-
         workThread = createWorkerThread();
         workThread.start();
     }
@@ -128,6 +119,35 @@ class WorkerSinkTask implements WorkerTask {
             consumer.close();
     }
 
+    /**
+     * Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the
+     * SinkTask.
+     *
+     * @returns true if successful, false if joining the consumer group was interrupted
+     */
+    public boolean joinConsumerGroupAndStart() {
+        String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
+        if (topicsStr == null || topicsStr.isEmpty())
+            throw new CopycatException("Sink tasks require a list of topics.");
+        String[] topics = topicsStr.split(",");
+        log.debug("Task {} subscribing to topics {}", id, topics);
+        consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
+
+        // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
+        // to work with. Any rewinding will be handled immediately when polling starts.
+        try {
+            consumer.poll(0);
+        } catch (WakeupException e) {
+            log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this);
+            return false;
+        }
+        task.initialize(context);
+        task.start(taskProps);
+        log.info("Sink task {} finished initialization and start", this);
+        started = true;
+        return true;
+    }
+
     /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
     public void poll(long timeoutMs) {
         try {
@@ -156,7 +176,7 @@ class WorkerSinkTask implements WorkerTask {
         for (TopicPartition tp : consumer.assignment()) {
             long pos = consumer.position(tp);
             offsets.put(tp, new OffsetAndMetadata(pos));
-            log.trace("{} committing {} offset {}", id, tp, pos);
+            log.debug("{} committing {} offset {}", id, tp, pos);
         }
 
         try {
@@ -273,12 +293,12 @@ class WorkerSinkTask implements WorkerTask {
             for (TopicPartition tp : partitions) {
                 long pos = consumer.position(tp);
                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
-                log.trace("{} assigned topic partition {} with offset {}", id, tp, pos);
+                log.debug("{} assigned topic partition {} with offset {}", id, tp, pos);
             }
             // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
             // task start. Since this callback gets invoked during that initial setup before we've started the task, we
             // need to guard against invoking the user's callback method during that period.
-            if (workThread != null)
+            if (started)
                 task.onPartitionsAssigned(partitions);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
index 486407d..ab3f1fe 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
@@ -50,9 +50,14 @@ class WorkerSinkTaskThread extends ShutdownableThread {
 
     @Override
     public void execute() {
+        // Try to join and start. If we're interrupted before this completes, bail.
+        if (!task.joinConsumerGroupAndStart())
+            return;
+
         while (getRunning()) {
             iteration();
         }
+
         // Make sure any uncommitted data has committed
         task.commitOffsets(true, -1);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 9740933..1f96c78 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -46,14 +46,14 @@ import java.util.concurrent.TimeoutException;
 class WorkerSourceTask implements WorkerTask {
     private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
 
-    private ConnectorTaskId id;
-    private SourceTask task;
+    private final ConnectorTaskId id;
+    private final SourceTask task;
     private final Converter keyConverter;
     private final Converter valueConverter;
     private KafkaProducer<byte[], byte[]> producer;
     private WorkerSourceTaskThread workThread;
-    private OffsetStorageReader offsetReader;
-    private OffsetStorageWriter offsetWriter;
+    private final OffsetStorageReader offsetReader;
+    private final OffsetStorageWriter offsetWriter;
     private final WorkerConfig workerConfig;
     private final Time time;
 
@@ -86,15 +86,12 @@ class WorkerSourceTask implements WorkerTask {
 
     @Override
     public void start(Properties props) {
-        task.initialize(new WorkerSourceTaskContext(offsetReader));
-        task.start(props);
-        workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id);
+        workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props);
         workThread.start();
     }
 
     @Override
     public void stop() {
-        task.stop();
         if (workThread != null)
             workThread.startGracefulShutdown();
     }
@@ -111,7 +108,6 @@ class WorkerSourceTask implements WorkerTask {
                 success = false;
             }
         }
-        commitOffsets();
         return success;
     }
 
@@ -277,13 +273,31 @@ class WorkerSourceTask implements WorkerTask {
 
 
     private class WorkerSourceTaskThread extends ShutdownableThread {
-        public WorkerSourceTaskThread(String name) {
+        private Properties workerProps;
+        private boolean finishedStart;
+        private boolean startedShutdownBeforeStartCompleted;
+
+        public WorkerSourceTaskThread(String name, Properties workerProps) {
             super(name);
+            this.workerProps = workerProps;
+            this.finishedStart = false;
+            this.startedShutdownBeforeStartCompleted = false;
         }
 
         @Override
         public void execute() {
             try {
+                task.initialize(new WorkerSourceTaskContext(offsetReader));
+                task.start(workerProps);
+                log.info("Source task {} finished initialization and start", this);
+                synchronized (this) {
+                    if (startedShutdownBeforeStartCompleted) {
+                        task.stop();
+                        return;
+                    }
+                    finishedStart = true;
+                }
+
                 while (getRunning()) {
                     List<SourceRecord> records = task.poll();
                     if (records == null)
@@ -293,6 +307,19 @@ class WorkerSourceTask implements WorkerTask {
             } catch (InterruptedException e) {
                 // Ignore and allow to exit.
             }
+
+            commitOffsets();
+        }
+
+        @Override
+        public void startGracefulShutdown() {
+            super.startGracefulShutdown();
+            synchronized (this) {
+                if (finishedStart)
+                    task.stop();
+                else
+                    startedShutdownBeforeStartCompleted = true;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 28e9e2e..177f7a6 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -136,6 +136,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         for (int i = 0; i < 10; i++) {
             workerThread.iteration();
         }
@@ -202,6 +203,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         // First iteration gets one record
         workerThread.iteration();
         // Second triggers commit, gets a second offset
@@ -236,6 +238,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         // Second iteration triggers commit
         workerThread.iteration();
         workerThread.iteration();
@@ -267,6 +270,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         // Second iteration triggers first commit, third iteration triggers second (failing) commit
         workerThread.iteration();
         workerThread.iteration();
@@ -292,6 +296,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         // Second iteration triggers commit
         workerThread.iteration();
         workerThread.iteration();
@@ -318,6 +323,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
         // trigger another commit
         workerThread.iteration();
@@ -393,6 +399,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         workerThread.iteration();
         workerThread.iteration();
         workerThread.iteration();
@@ -436,6 +443,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
         workerThread.iteration();
         workerThread.iteration();
         workerTask.stop();
@@ -448,7 +456,17 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
 
+        workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
+                workerTask, "mock-worker-thread", time,
+                workerConfig);
+        PowerMock.expectPrivate(workerTask, "createWorkerThread")
+                .andReturn(workerThread);
+        workerThread.start();
+        PowerMock.expectLastCall();
+
         consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
+        PowerMock.expectLastCall();
+
         EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
             @Override
             public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@@ -464,14 +482,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.expectLastCall();
         sinkTask.start(TASK_PROPS);
         PowerMock.expectLastCall();
-
-        workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
-                workerTask, "mock-worker-thread", time,
-                workerConfig);
-        PowerMock.expectPrivate(workerTask, "createWorkerThread")
-                .andReturn(workerThread);
-        workerThread.start();
-        PowerMock.expectLastCall();
     }
 
     private void expectStopTask(final long expectedMessages) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 566391d..452c5cb 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.copycat.source.SourceRecord;
@@ -208,6 +209,34 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSlowTaskStart() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                Utils.sleep(100);
+                return null;
+            }
+        });
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.start(EMPTY_TASK_PROPS);
+        // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
+        // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
+        // cannot be invoked immediately in the thread trying to stop the task.
+        workerTask.stop();
+        assertEquals(true, workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
+    }
 
     private CountDownLatch expectPolls(int count) throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(count);