You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/05 17:39:00 UTC
kafka git commit: KAFKA-2741: Make SourceTaskContext and
SinkTaskContext interfaces and keep implementations in runtime jar.
Repository: kafka
Updated Branches:
refs/heads/trunk 4a9e7607b -> 7d6515fb8
KAFKA-2741: Make SourceTaskContext and SinkTaskContext interfaces and keep implementations in runtime jar.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Guozhang Wang
Closes #420 from ewencp/task-context-interfaces
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d6515fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d6515fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d6515fb
Branch: refs/heads/trunk
Commit: 7d6515fb8f6141f5c34fe8434e97ea6ebd65941f
Parents: 4a9e760
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Nov 5 08:44:44 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 5 08:44:44 2015 -0800
----------------------------------------------------------------------
.../kafka/copycat/sink/SinkTaskContext.java | 28 ++++------------
.../kafka/copycat/source/SourceTaskContext.java | 12 ++-----
.../copycat/file/FileStreamSourceTaskTest.java | 5 ++-
.../kafka/copycat/runtime/WorkerSinkTask.java | 20 ++++-------
.../copycat/runtime/WorkerSinkTaskContext.java | 29 ++++++++++++++--
.../kafka/copycat/runtime/WorkerSourceTask.java | 3 +-
.../runtime/WorkerSourceTaskContext.java | 35 ++++++++++++++++++++
.../copycat/runtime/WorkerSinkTaskTest.java | 7 ++--
8 files changed, 87 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
index 399dcef..763b9a4 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -20,7 +20,6 @@ package org.apache.kafka.copycat.sink;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -28,14 +27,7 @@ import java.util.Set;
* Context passed to SinkTasks, allowing them to access utilities in the copycat runtime.
*/
@InterfaceStability.Unstable
-public abstract class SinkTaskContext {
- protected Map<TopicPartition, Long> offsets;
- protected long timeoutMs = -1L;
-
- public SinkTaskContext() {
- offsets = new HashMap<>();
- }
-
+public interface SinkTaskContext {
/**
* Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets
* in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record
@@ -46,9 +38,7 @@ public abstract class SinkTaskContext {
*
* @param offsets map of offsets for topic partitions
*/
- public void offset(Map<TopicPartition, Long> offsets) {
- this.offsets = offsets;
- }
+ void offset(Map<TopicPartition, Long> offsets);
/**
* Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets
@@ -61,9 +51,7 @@ public abstract class SinkTaskContext {
* @param tp the topic partition to reset offset.
* @param offset the offset to reset to.
*/
- public void offset(TopicPartition tp, long offset) {
- offsets.put(tp, offset);
- }
+ void offset(TopicPartition tp, long offset);
/**
* Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain
@@ -72,25 +60,23 @@ public abstract class SinkTaskContext {
* issues. SinkTasks use this method to set how long to wait before retrying.
* @param timeoutMs the backoff timeout in milliseconds.
*/
- public void timeout(long timeoutMs) {
- this.timeoutMs = timeoutMs;
- }
+ void timeout(long timeoutMs);
/**
* Get the current set of assigned TopicPartitions for this task.
* @return the set of currently assigned TopicPartitions
*/
- public abstract Set<TopicPartition> assignment();
+ Set<TopicPartition> assignment();
/**
* Pause consumption of messages from the specified TopicPartitions.
* @param partitions the partitions which should be paused
*/
- public abstract void pause(TopicPartition... partitions);
+ void pause(TopicPartition... partitions);
/**
* Resume consumption of messages from previously paused TopicPartitions.
* @param partitions the partitions to resume
*/
- public abstract void resume(TopicPartition... partitions);
+ void resume(TopicPartition... partitions);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
index a3875e7..bc18c30 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
@@ -24,17 +24,9 @@ import org.apache.kafka.copycat.storage.OffsetStorageReader;
* runtime.
*/
@InterfaceStability.Unstable
-public class SourceTaskContext {
- private final OffsetStorageReader reader;
-
- public SourceTaskContext(OffsetStorageReader reader) {
- this.reader = reader;
- }
-
+public interface SourceTaskContext {
/**
* Get the OffsetStorageReader for this SourceTask.
*/
- public OffsetStorageReader offsetStorageReader() {
- return reader;
- }
+ OffsetStorageReader offsetStorageReader();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index d2781c9..4365def 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -44,6 +44,7 @@ public class FileStreamSourceTaskTest {
private File tempFile;
private Properties config;
private OffsetStorageReader offsetStorageReader;
+ private SourceTaskContext context;
private FileStreamSourceTask task;
private boolean verifyMocks = false;
@@ -56,7 +57,8 @@ public class FileStreamSourceTaskTest {
config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
task = new FileStreamSourceTask();
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
- task.initialize(new SourceTaskContext(offsetStorageReader));
+ context = PowerMock.createMock(SourceTaskContext.class);
+ task.initialize(context);
}
@After
@@ -142,6 +144,7 @@ public class FileStreamSourceTaskTest {
private void expectOffsetLookupReturnNone() {
+ EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index e9aa055..439a1f5 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -77,8 +77,14 @@ class WorkerSinkTask implements WorkerTask {
public void start(Properties props) {
consumer = createConsumer(props);
context = new WorkerSinkTaskContext(consumer);
+
+ // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
+ // to work with. Any rewinding will be handled immediately when polling starts.
+ consumer.poll(0);
+
task.initialize(context);
task.start(props);
+
workThread = createWorkerThread();
workThread.start();
}
@@ -207,18 +213,6 @@ class WorkerSinkTask implements WorkerTask {
log.debug("Task {} subscribing to topics {}", id, topics);
newConsumer.subscribe(Arrays.asList(topics), new HandleRebalance());
- // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
- // enable exactly once delivery to that system).
- //
- // To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee.
- // We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
- newConsumer.poll(0);
- Map<TopicPartition, Long> offsets = context.offsets();
- for (TopicPartition tp : newConsumer.assignment()) {
- Long offset = offsets.get(tp);
- if (offset != null)
- newConsumer.seek(tp, offset);
- }
return newConsumer;
}
@@ -264,7 +258,7 @@ class WorkerSinkTask implements WorkerTask {
consumer.seek(tp, offset);
}
}
- offsets.clear();
+ context.clearOffsets();
}
private class HandleRebalance implements ConsumerRebalanceListener {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
index b8d7d54..b474589 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
@@ -16,17 +16,35 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
import org.apache.kafka.copycat.sink.SinkTaskContext;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-public class WorkerSinkTaskContext extends SinkTaskContext {
-
- KafkaConsumer<byte[], byte[]> consumer;
+public class WorkerSinkTaskContext implements SinkTaskContext {
+ private Map<TopicPartition, Long> offsets;
+ private long timeoutMs;
+ private KafkaConsumer<byte[], byte[]> consumer;
public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+ this.offsets = new HashMap<>();
+ this.timeoutMs = -1L;
this.consumer = consumer;
}
+ @Override
+ public void offset(Map<TopicPartition, Long> offsets) {
+ this.offsets.putAll(offsets);
+ }
+
+ @Override
+ public void offset(TopicPartition tp, long offset) {
+ offsets.put(tp, offset);
+ }
+
+ public void clearOffsets() {
+ offsets.clear();
+ }
+
/**
* Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
* @return the map of offsets
@@ -35,6 +53,11 @@ public class WorkerSinkTaskContext extends SinkTaskContext {
return offsets;
}
+ @Override
+ public void timeout(long timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ }
+
/**
* Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework.
* @return the backoff timeout in milliseconds.
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 78b588c..9740933 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.source.SourceTaskContext;
import org.apache.kafka.copycat.storage.Converter;
import org.apache.kafka.copycat.storage.OffsetStorageReader;
import org.apache.kafka.copycat.storage.OffsetStorageWriter;
@@ -87,7 +86,7 @@ class WorkerSourceTask implements WorkerTask {
@Override
public void start(Properties props) {
- task.initialize(new SourceTaskContext(offsetReader));
+ task.initialize(new WorkerSourceTaskContext(offsetReader));
task.start(props);
workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id);
workThread.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java
new file mode 100644
index 0000000..f19e4e6
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.source.SourceTaskContext;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+
+public class WorkerSourceTaskContext implements SourceTaskContext {
+
+ private final OffsetStorageReader reader;
+
+ public WorkerSourceTaskContext(OffsetStorageReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return reader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 08707c9..acc1179 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -400,13 +400,16 @@ public class WorkerSinkTaskTest extends ThreadedTest {
}
private void expectInitializeTask(Properties taskProps) throws Exception {
+ PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
+ .andReturn(consumer);
+
+ EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.<byte[], byte[]>empty());
+
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(taskProps);
PowerMock.expectLastCall();
- PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
- .andReturn(consumer);
workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
workerTask, "mock-worker-thread", time,
workerConfig);