You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/05 17:39:00 UTC

kafka git commit: KAFKA-2741: Make SourceTaskContext and SinkTaskContext interfaces and keep implementations in runtime jar.

Repository: kafka
Updated Branches:
  refs/heads/trunk 4a9e7607b -> 7d6515fb8


KAFKA-2741: Make SourceTaskContext and SinkTaskContext interfaces and keep implementations in runtime jar.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Guozhang Wang

Closes #420 from ewencp/task-context-interfaces


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d6515fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d6515fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d6515fb

Branch: refs/heads/trunk
Commit: 7d6515fb8f6141f5c34fe8434e97ea6ebd65941f
Parents: 4a9e760
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Nov 5 08:44:44 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 5 08:44:44 2015 -0800

----------------------------------------------------------------------
 .../kafka/copycat/sink/SinkTaskContext.java     | 28 ++++------------
 .../kafka/copycat/source/SourceTaskContext.java | 12 ++-----
 .../copycat/file/FileStreamSourceTaskTest.java  |  5 ++-
 .../kafka/copycat/runtime/WorkerSinkTask.java   | 20 ++++-------
 .../copycat/runtime/WorkerSinkTaskContext.java  | 29 ++++++++++++++--
 .../kafka/copycat/runtime/WorkerSourceTask.java |  3 +-
 .../runtime/WorkerSourceTaskContext.java        | 35 ++++++++++++++++++++
 .../copycat/runtime/WorkerSinkTaskTest.java     |  7 ++--
 8 files changed, 87 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
index 399dcef..763b9a4 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -20,7 +20,6 @@ package org.apache.kafka.copycat.sink;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -28,14 +27,7 @@ import java.util.Set;
  * Context passed to SinkTasks, allowing them to access utilities in the copycat runtime.
  */
 @InterfaceStability.Unstable
-public abstract class SinkTaskContext {
-    protected Map<TopicPartition, Long> offsets;
-    protected long timeoutMs = -1L;
-
-    public SinkTaskContext() {
-        offsets = new HashMap<>();
-    }
-
+public interface SinkTaskContext {
     /**
      * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets
      * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record
@@ -46,9 +38,7 @@ public abstract class SinkTaskContext {
      *
      * @param offsets map of offsets for topic partitions
      */
-    public void offset(Map<TopicPartition, Long> offsets) {
-        this.offsets = offsets;
-    }
+    void offset(Map<TopicPartition, Long> offsets);
 
     /**
      * Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets
@@ -61,9 +51,7 @@ public abstract class SinkTaskContext {
      * @param tp the topic partition to reset offset.
      * @param offset the offset to reset to.
      */
-    public void offset(TopicPartition tp, long offset) {
-        offsets.put(tp, offset);
-    }
+    void offset(TopicPartition tp, long offset);
 
     /**
      * Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain
@@ -72,25 +60,23 @@ public abstract class SinkTaskContext {
      * issues. SinkTasks use this method to set how long to wait before retrying.
      * @param timeoutMs the backoff timeout in milliseconds.
      */
-    public void timeout(long timeoutMs) {
-        this.timeoutMs = timeoutMs;
-    }
+    void timeout(long timeoutMs);
 
     /**
      * Get the current set of assigned TopicPartitions for this task.
      * @return the set of currently assigned TopicPartitions
      */
-    public abstract Set<TopicPartition> assignment();
+    Set<TopicPartition> assignment();
 
     /**
      * Pause consumption of messages from the specified TopicPartitions.
      * @param partitions the partitions which should be paused
      */
-    public abstract void pause(TopicPartition... partitions);
+    void pause(TopicPartition... partitions);
 
     /**
      * Resume consumption of messages from previously paused TopicPartitions.
      * @param partitions the partitions to resume
      */
-    public abstract void resume(TopicPartition... partitions);
+    void resume(TopicPartition... partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
index a3875e7..bc18c30 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
@@ -24,17 +24,9 @@ import org.apache.kafka.copycat.storage.OffsetStorageReader;
  * runtime.
  */
 @InterfaceStability.Unstable
-public class SourceTaskContext {
-    private final OffsetStorageReader reader;
-
-    public SourceTaskContext(OffsetStorageReader reader) {
-        this.reader = reader;
-    }
-
+public interface SourceTaskContext {
     /**
      * Get the OffsetStorageReader for this SourceTask.
      */
-    public OffsetStorageReader offsetStorageReader() {
-        return reader;
-    }
+    OffsetStorageReader offsetStorageReader();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index d2781c9..4365def 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -44,6 +44,7 @@ public class FileStreamSourceTaskTest {
     private File tempFile;
     private Properties config;
     private OffsetStorageReader offsetStorageReader;
+    private SourceTaskContext context;
     private FileStreamSourceTask task;
 
     private boolean verifyMocks = false;
@@ -56,7 +57,8 @@ public class FileStreamSourceTaskTest {
         config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
         task = new FileStreamSourceTask();
         offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
-        task.initialize(new SourceTaskContext(offsetStorageReader));
+        context = PowerMock.createMock(SourceTaskContext.class);
+        task.initialize(context);
     }
 
     @After
@@ -142,6 +144,7 @@ public class FileStreamSourceTaskTest {
 
 
     private void expectOffsetLookupReturnNone() {
+        EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
         EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index e9aa055..439a1f5 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -77,8 +77,14 @@ class WorkerSinkTask implements WorkerTask {
     public void start(Properties props) {
         consumer = createConsumer(props);
         context = new WorkerSinkTaskContext(consumer);
+
+        // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions
+        // to work with. Any rewinding will be handled immediately when polling starts.
+        consumer.poll(0);
+
         task.initialize(context);
         task.start(props);
+
         workThread = createWorkerThread();
         workThread.start();
     }
@@ -207,18 +213,6 @@ class WorkerSinkTask implements WorkerTask {
         log.debug("Task {} subscribing to topics {}", id, topics);
         newConsumer.subscribe(Arrays.asList(topics), new HandleRebalance());
 
-        // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
-        // enable exactly once delivery to that system).
-        //
-        // To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee.
-        // We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
-        newConsumer.poll(0);
-        Map<TopicPartition, Long> offsets = context.offsets();
-        for (TopicPartition tp : newConsumer.assignment()) {
-            Long offset = offsets.get(tp);
-            if (offset != null)
-                newConsumer.seek(tp, offset);
-        }
         return newConsumer;
     }
 
@@ -264,7 +258,7 @@ class WorkerSinkTask implements WorkerTask {
                 consumer.seek(tp, offset);
             }
         }
-        offsets.clear();
+        context.clearOffsets();
     }
 
     private class HandleRebalance implements ConsumerRebalanceListener {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
index b8d7d54..b474589 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
@@ -16,17 +16,35 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
 import org.apache.kafka.copycat.sink.SinkTaskContext;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-public class WorkerSinkTaskContext extends SinkTaskContext {
-
-    KafkaConsumer<byte[], byte[]> consumer;
+public class WorkerSinkTaskContext implements SinkTaskContext {
+    private Map<TopicPartition, Long> offsets;
+    private long timeoutMs;
+    private KafkaConsumer<byte[], byte[]> consumer;
 
     public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+        this.offsets = new HashMap<>();
+        this.timeoutMs = -1L;
         this.consumer = consumer;
     }
 
+    @Override
+    public void offset(Map<TopicPartition, Long> offsets) {
+        this.offsets.putAll(offsets);
+    }
+
+    @Override
+    public void offset(TopicPartition tp, long offset) {
+        offsets.put(tp, offset);
+    }
+
+    public void clearOffsets() {
+        offsets.clear();
+    }
+
     /**
      * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
      * @return the map of offsets
@@ -35,6 +53,11 @@ public class WorkerSinkTaskContext extends SinkTaskContext {
         return offsets;
     }
 
+    @Override
+    public void timeout(long timeoutMs) {
+        this.timeoutMs = timeoutMs;
+    }
+
     /**
      * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework.
      * @return the backoff timeout in milliseconds.

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 78b588c..9740933 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.source.SourceTaskContext;
 import org.apache.kafka.copycat.storage.Converter;
 import org.apache.kafka.copycat.storage.OffsetStorageReader;
 import org.apache.kafka.copycat.storage.OffsetStorageWriter;
@@ -87,7 +86,7 @@ class WorkerSourceTask implements WorkerTask {
 
     @Override
     public void start(Properties props) {
-        task.initialize(new SourceTaskContext(offsetReader));
+        task.initialize(new WorkerSourceTaskContext(offsetReader));
         task.start(props);
         workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id);
         workThread.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java
new file mode 100644
index 0000000..f19e4e6
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskContext.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.source.SourceTaskContext;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+
+public class WorkerSourceTaskContext implements SourceTaskContext {
+
+    private final OffsetStorageReader reader;
+
+    public WorkerSourceTaskContext(OffsetStorageReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public OffsetStorageReader offsetStorageReader() {
+        return reader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6515fb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 08707c9..acc1179 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -400,13 +400,16 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     }
 
     private void expectInitializeTask(Properties taskProps) throws Exception {
+        PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
+                .andReturn(consumer);
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(ConsumerRecords.<byte[], byte[]>empty());
+
         sinkTask.initialize(EasyMock.capture(sinkTaskContext));
         PowerMock.expectLastCall();
         sinkTask.start(taskProps);
         PowerMock.expectLastCall();
 
-        PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
-                .andReturn(consumer);
         workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
                 workerTask, "mock-worker-thread", time,
                 workerConfig);