You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/06 19:27:29 UTC
kafka git commit: KAFKA-2713: Run task start and stop methods in
worker threads so they execute in parallel and cannot block the herder
thread.
Repository: kafka
Updated Branches:
refs/heads/trunk b1eaa46a5 -> a76660ac8
KAFKA-2713: Run task start and stop methods in worker threads so they execute in parallel and cannot block the herder thread.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #443 from ewencp/kafka-2713-task-start-stop-threaded
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a76660ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a76660ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a76660ac
Branch: refs/heads/trunk
Commit: a76660ac811eb6e2cdf36a110ed4afb5bf3a617a
Parents: b1eaa46
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Nov 6 10:27:18 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Nov 6 10:27:18 2015 -0800
----------------------------------------------------------------------
.../org/apache/kafka/copycat/sink/SinkTask.java | 16 ++++++
.../apache/kafka/copycat/source/SourceTask.java | 20 ++++++++
.../kafka/copycat/runtime/WorkerSinkTask.java | 52 ++++++++++++++------
.../copycat/runtime/WorkerSinkTaskThread.java | 5 ++
.../kafka/copycat/runtime/WorkerSourceTask.java | 47 ++++++++++++++----
.../copycat/runtime/WorkerSinkTaskTest.java | 26 +++++++---
.../copycat/runtime/WorkerSourceTaskTest.java | 29 +++++++++++
7 files changed, 161 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index c6cd12f..7c03cda 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -23,6 +23,7 @@ import org.apache.kafka.copycat.connector.Task;
import java.util.Collection;
import java.util.Map;
+import java.util.Properties;
/**
* SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
@@ -47,6 +48,13 @@ public abstract class SinkTask implements Task {
}
/**
+ * Start the Task. This should handle any configuration parsing and one-time setup of the task.
+ * @param props initial configuration
+ */
+ @Override
+ public abstract void start(Properties props);
+
+ /**
* Put the records in the sink. Usually this should send the records to the sink asynchronously
* and immediately return.
*
@@ -84,4 +92,12 @@ public abstract class SinkTask implements Task {
*/
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
+
+ /**
+ * Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other
+ * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset
+ * commit has completed. Implementations of this method should only need to perform final cleanup operations, such
+ * as closing network connections to the sink system.
+ */
+ public abstract void stop();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
index 1e1da34..30cbf16 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.connector.Task;
import java.util.List;
+import java.util.Properties;
/**
* SourceTask is a Task that pulls records from another system for storage in Kafka.
@@ -38,6 +39,13 @@ public abstract class SourceTask implements Task {
}
/**
+ * Start the Task. This should handle any configuration parsing and one-time setup of the task.
+ * @param props initial configuration
+ */
+ @Override
+ public abstract void start(Properties props);
+
+ /**
* Poll this SourceTask for new records. This method should block if no data is currently
* available.
*
@@ -59,4 +67,16 @@ public abstract class SourceTask implements Task {
public void commit() throws InterruptedException {
// This space intentionally left blank.
}
+
+ /**
+ * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop
+ * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has
+ * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and
+ * {@link #commit()}.
+ *
+ * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method
+ * could set a flag that will force {@link #poll()} to exit immediately and invoke
+ * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests.
+ */
+ public abstract void stop();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 55a67c0..dc51730 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -60,8 +60,10 @@ class WorkerSinkTask implements WorkerTask {
private final Converter keyConverter;
private final Converter valueConverter;
private WorkerSinkTaskThread workThread;
+ private Properties taskProps;
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
+ private boolean started;
private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
@@ -72,26 +74,15 @@ class WorkerSinkTask implements WorkerTask {
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.time = time;
+ this.started = false;
}
@Override
public void start(Properties props) {
+ taskProps = props;
consumer = createConsumer();
context = new WorkerSinkTaskContext(consumer);
- // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
- // to work with. Any rewinding will be handled immediately when polling starts.
- String topicsStr = props.getProperty(SinkTask.TOPICS_CONFIG);
- if (topicsStr == null || topicsStr.isEmpty())
- throw new CopycatException("Sink tasks require a list of topics.");
- String[] topics = topicsStr.split(",");
- log.debug("Task {} subscribing to topics {}", id, topics);
- consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
- consumer.poll(0);
-
- task.initialize(context);
- task.start(props);
-
workThread = createWorkerThread();
workThread.start();
}
@@ -128,6 +119,35 @@ class WorkerSinkTask implements WorkerTask {
consumer.close();
}
+ /**
+ * Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the
+ * SinkTask.
+ *
+ * @returns true if successful, false if joining the consumer group was interrupted
+ */
+ public boolean joinConsumerGroupAndStart() {
+ String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
+ if (topicsStr == null || topicsStr.isEmpty())
+ throw new CopycatException("Sink tasks require a list of topics.");
+ String[] topics = topicsStr.split(",");
+ log.debug("Task {} subscribing to topics {}", id, topics);
+ consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
+
+ // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
+ // to work with. Any rewinding will be handled immediately when polling starts.
+ try {
+ consumer.poll(0);
+ } catch (WakeupException e) {
+ log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this);
+ return false;
+ }
+ task.initialize(context);
+ task.start(taskProps);
+ log.info("Sink task {} finished initialization and start", this);
+ started = true;
+ return true;
+ }
+
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
public void poll(long timeoutMs) {
try {
@@ -156,7 +176,7 @@ class WorkerSinkTask implements WorkerTask {
for (TopicPartition tp : consumer.assignment()) {
long pos = consumer.position(tp);
offsets.put(tp, new OffsetAndMetadata(pos));
- log.trace("{} committing {} offset {}", id, tp, pos);
+ log.debug("{} committing {} offset {}", id, tp, pos);
}
try {
@@ -273,12 +293,12 @@ class WorkerSinkTask implements WorkerTask {
for (TopicPartition tp : partitions) {
long pos = consumer.position(tp);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
- log.trace("{} assigned topic partition {} with offset {}", id, tp, pos);
+ log.debug("{} assigned topic partition {} with offset {}", id, tp, pos);
}
// Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
// task start. Since this callback gets invoked during that initial setup before we've started the task, we
// need to guard against invoking the user's callback method during that period.
- if (workThread != null)
+ if (started)
task.onPartitionsAssigned(partitions);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
index 486407d..ab3f1fe 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
@@ -50,9 +50,14 @@ class WorkerSinkTaskThread extends ShutdownableThread {
@Override
public void execute() {
+ // Try to join and start. If we're interrupted before this completes, bail.
+ if (!task.joinConsumerGroupAndStart())
+ return;
+
while (getRunning()) {
iteration();
}
+
// Make sure any uncommitted data has committed
task.commitOffsets(true, -1);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 9740933..1f96c78 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -46,14 +46,14 @@ import java.util.concurrent.TimeoutException;
class WorkerSourceTask implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
- private ConnectorTaskId id;
- private SourceTask task;
+ private final ConnectorTaskId id;
+ private final SourceTask task;
private final Converter keyConverter;
private final Converter valueConverter;
private KafkaProducer<byte[], byte[]> producer;
private WorkerSourceTaskThread workThread;
- private OffsetStorageReader offsetReader;
- private OffsetStorageWriter offsetWriter;
+ private final OffsetStorageReader offsetReader;
+ private final OffsetStorageWriter offsetWriter;
private final WorkerConfig workerConfig;
private final Time time;
@@ -86,15 +86,12 @@ class WorkerSourceTask implements WorkerTask {
@Override
public void start(Properties props) {
- task.initialize(new WorkerSourceTaskContext(offsetReader));
- task.start(props);
- workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id);
+ workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props);
workThread.start();
}
@Override
public void stop() {
- task.stop();
if (workThread != null)
workThread.startGracefulShutdown();
}
@@ -111,7 +108,6 @@ class WorkerSourceTask implements WorkerTask {
success = false;
}
}
- commitOffsets();
return success;
}
@@ -277,13 +273,31 @@ class WorkerSourceTask implements WorkerTask {
private class WorkerSourceTaskThread extends ShutdownableThread {
- public WorkerSourceTaskThread(String name) {
+ private Properties workerProps;
+ private boolean finishedStart;
+ private boolean startedShutdownBeforeStartCompleted;
+
+ public WorkerSourceTaskThread(String name, Properties workerProps) {
super(name);
+ this.workerProps = workerProps;
+ this.finishedStart = false;
+ this.startedShutdownBeforeStartCompleted = false;
}
@Override
public void execute() {
try {
+ task.initialize(new WorkerSourceTaskContext(offsetReader));
+ task.start(workerProps);
+ log.info("Source task {} finished initialization and start", this);
+ synchronized (this) {
+ if (startedShutdownBeforeStartCompleted) {
+ task.stop();
+ return;
+ }
+ finishedStart = true;
+ }
+
while (getRunning()) {
List<SourceRecord> records = task.poll();
if (records == null)
@@ -293,6 +307,19 @@ class WorkerSourceTask implements WorkerTask {
} catch (InterruptedException e) {
// Ignore and allow to exit.
}
+
+ commitOffsets();
+ }
+
+ @Override
+ public void startGracefulShutdown() {
+ super.startGracefulShutdown();
+ synchronized (this) {
+ if (finishedStart)
+ task.stop();
+ else
+ startedShutdownBeforeStartCompleted = true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 28e9e2e..177f7a6 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -136,6 +136,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
for (int i = 0; i < 10; i++) {
workerThread.iteration();
}
@@ -202,6 +203,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
// First iteration gets one record
workerThread.iteration();
// Second triggers commit, gets a second offset
@@ -236,6 +238,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
// Second iteration triggers commit
workerThread.iteration();
workerThread.iteration();
@@ -267,6 +270,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
// Second iteration triggers first commit, third iteration triggers second (failing) commit
workerThread.iteration();
workerThread.iteration();
@@ -292,6 +296,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
// Second iteration triggers commit
workerThread.iteration();
workerThread.iteration();
@@ -318,6 +323,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
// Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
// trigger another commit
workerThread.iteration();
@@ -393,6 +399,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
workerThread.iteration();
workerThread.iteration();
workerThread.iteration();
@@ -436,6 +443,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
workerThread.iteration();
workerThread.iteration();
workerTask.stop();
@@ -448,7 +456,17 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
+ workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
+ workerTask, "mock-worker-thread", time,
+ workerConfig);
+ PowerMock.expectPrivate(workerTask, "createWorkerThread")
+ .andReturn(workerThread);
+ workerThread.start();
+ PowerMock.expectLastCall();
+
consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
+ PowerMock.expectLastCall();
+
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@@ -464,14 +482,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.expectLastCall();
sinkTask.start(TASK_PROPS);
PowerMock.expectLastCall();
-
- workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
- workerTask, "mock-worker-thread", time,
- workerConfig);
- PowerMock.expectPrivate(workerTask, "createWorkerThread")
- .andReturn(workerThread);
- workerThread.start();
- PowerMock.expectLastCall();
}
private void expectStopTask(final long expectedMessages) throws Exception {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a76660ac/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 566391d..452c5cb 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
import org.apache.kafka.copycat.source.SourceRecord;
@@ -208,6 +209,34 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testSlowTaskStart() throws Exception {
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ Utils.sleep(100);
+ return null;
+ }
+ });
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.start(EMPTY_TASK_PROPS);
+ // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
+ // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
+ // cannot be invoked immediately in the thread trying to stop the task.
+ workerTask.stop();
+ assertEquals(true, workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
+ }
private CountDownLatch expectPolls(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count);