You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/06 23:21:57 UTC
kafka git commit: KAFKA-2482: Allow sink tasks to get their current
assignment, as well as pause and resume topic partitions.
Repository: kafka
Updated Branches:
refs/heads/trunk a0ca8f642 -> 23f9afb70
KAFKA-2482: Allow sink tasks to get their current assignment, as well as pause and resume topic partitions.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Guozhang Wang
Closes #249 from ewencp/kafka-2482-sink-tasks-pause-consumption
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23f9afb7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23f9afb7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23f9afb7
Branch: refs/heads/trunk
Commit: 23f9afb70bc5cdbf66550a1e69161e2fe06a909a
Parents: a0ca8f6
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Oct 6 14:26:08 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 6 14:26:08 2015 -0700
----------------------------------------------------------------------
.../errors/IllegalWorkerStateException.java | 35 ++++++
.../kafka/copycat/sink/SinkTaskContext.java | 19 +++
.../copycat/runtime/SinkTaskContextImpl.java | 24 ----
.../kafka/copycat/runtime/WorkerSinkTask.java | 35 +++++-
.../copycat/runtime/WorkerSinkTaskTest.java | 118 ++++++++++++++++++-
5 files changed, 201 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
new file mode 100644
index 0000000..6f9f233
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * Indicates that a method has been invoked illegally or at an invalid time by a connector or task.
+ */
+public class IllegalWorkerStateException extends CopycatException {
+ public IllegalWorkerStateException(String s) {
+ super(s);
+ }
+
+ public IllegalWorkerStateException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public IllegalWorkerStateException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
index 67c045f..3ecff27 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
/**
* Context passed to SinkTasks, allowing them to access utilities in the copycat runtime.
@@ -56,4 +57,22 @@ public abstract class SinkTaskContext {
public Map<TopicPartition, Long> offsets() {
return offsets;
}
+
+ /**
+ * Get the current set of assigned TopicPartitions for this task.
+ * @return the set of currently assigned TopicPartitions
+ */
+ public abstract Set<TopicPartition> assignment();
+
+ /**
+ * Pause consumption of messages from the specified TopicPartitions.
+ * @param partitions the partitions which should be paused
+ */
+ public abstract void pause(TopicPartition... partitions);
+
+ /**
+ * Resume consumption of messages from previously paused TopicPartitions.
+ * @param partitions the partitions to resume
+ */
+ public abstract void resume(TopicPartition... partitions);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
deleted file mode 100644
index f47c984..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.copycat.sink.SinkTaskContext;
-
-class SinkTaskContextImpl extends SinkTaskContext {
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index cbda201..edb415a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.sink.SinkTaskContext;
@@ -59,8 +60,8 @@ class WorkerSinkTask implements WorkerTask {
this.workerConfig = workerConfig;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
- context = new SinkTaskContextImpl();
this.time = time;
+ this.context = new WorkerSinkTaskContext();
}
@Override
@@ -234,4 +235,36 @@ class WorkerSinkTask implements WorkerTask {
}
}
}
+
+
+ private class WorkerSinkTaskContext extends SinkTaskContext {
+ @Override
+ public Set<TopicPartition> assignment() {
+ if (consumer == null)
+ throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized");
+ return consumer.assignment();
+ }
+
+ @Override
+ public void pause(TopicPartition... partitions) {
+ if (consumer == null)
+ throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
+ try {
+ consumer.pause(partitions);
+ } catch (IllegalStateException e) {
+ throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
+ }
+ }
+
+ @Override
+ public void resume(TopicPartition... partitions) {
+ if (consumer == null)
+ throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
+ try {
+ consumer.resume(partitions);
+ } catch (IllegalStateException e) {
+ throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 687ed8f..e4d1d8e 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.sink.SinkTaskContext;
@@ -40,9 +41,15 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WorkerSinkTask.class)
@@ -53,6 +60,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
// with mix of integer/string in Copycat
private static final String TOPIC = "test";
private static final int PARTITION = 12;
+ private static final int PARTITION2 = 13;
+ private static final int PARTITION3 = 14;
private static final long FIRST_OFFSET = 45;
private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
private static final int KEY = 12;
@@ -62,10 +71,14 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private static final byte[] RAW_VALUE = "value".getBytes();
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
+ private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
+ private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
+ private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Time time;
@Mock private SinkTask sinkTask;
+ private Capture<SinkTaskContext> sinkTaskContext = EasyMock.newCapture();
private WorkerConfig workerConfig;
@Mock private Converter keyConverter;
@Mock
@@ -266,9 +279,79 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
- private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps)
- throws Exception {
- sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
+ @Test
+ public void testAssignmentPauseResume() throws Exception {
+ // Just validate that the calls are passed through to the consumer, and that where appropriate errors are
+ // converted
+
+ Properties taskProps = new Properties();
+
+ expectInitializeTask(taskProps);
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)),
+ sinkTaskContext.getValue().assignment());
+ return null;
+ }
+ });
+ EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ try {
+ sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
+ fail("Trying to pause unassigned partition should have thrown an Copycat exception");
+ } catch (CopycatException e) {
+ // expected
+ }
+ sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+ return null;
+ }
+ });
+ consumer.pause(UNASSIGNED_TOPIC_PARTITION);
+ PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+ consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+ PowerMock.expectLastCall();
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ try {
+ sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
+ fail("Trying to resume unassigned partition should have thrown an Copycat exception");
+ } catch (CopycatException e) {
+ // expected
+ }
+
+ sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+ return null;
+ }
+ });
+ consumer.resume(UNASSIGNED_TOPIC_PARTITION);
+ PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+ consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+ PowerMock.expectLastCall();
+
+ expectStopTask(0);
+
+ PowerMock.replayAll();
+
+ workerTask.start(taskProps);
+ workerThread.iteration();
+ workerThread.iteration();
+ workerThread.iteration();
+ workerTask.stop();
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+
+ private void expectInitializeTask(Properties taskProps) throws Exception {
+ sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(taskProps);
PowerMock.expectLastCall();
@@ -282,7 +365,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
.andReturn(workerThread);
workerThread.start();
PowerMock.expectLastCall();
- return consumer;
}
private void expectStopTask(final long expectedMessages) throws Exception {
@@ -328,6 +410,32 @@ public class WorkerSinkTaskTest extends ThreadedTest {
return capturedRecords;
}
+ private IExpectationSetters<Object> expectOnePoll() {
+ // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
+ // returning empty data, we return one record. The expectation is that the data will be ignored by the
+ // response behavior specified using the return value of this method.
+ EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+ new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+ @Override
+ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+ // "Sleep" so time will progress
+ time.sleep(1L);
+ ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+ Collections.singletonMap(
+ new TopicPartition(TOPIC, PARTITION),
+ Arrays.asList(
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+ )));
+ recordsReturned++;
+ return records;
+ }
+ });
+ EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+ EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+ sinkTask.put(EasyMock.anyObject(Collection.class));
+ return EasyMock.expectLastCall();
+ }
+
private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
final RuntimeException flushError,
final Exception consumerCommitError,