You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/06 23:21:57 UTC

kafka git commit: KAFKA-2482: Allow sink tasks to get their current assignment, as well as pause and resume topic partitions.

Repository: kafka
Updated Branches:
  refs/heads/trunk a0ca8f642 -> 23f9afb70


KAFKA-2482: Allow sink tasks to get their current assignment, as well as pause and resume topic partitions.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Guozhang Wang

Closes #249 from ewencp/kafka-2482-sink-tasks-pause-consumption


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23f9afb7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23f9afb7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23f9afb7

Branch: refs/heads/trunk
Commit: 23f9afb70bc5cdbf66550a1e69161e2fe06a909a
Parents: a0ca8f6
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Oct 6 14:26:08 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 6 14:26:08 2015 -0700

----------------------------------------------------------------------
 .../errors/IllegalWorkerStateException.java     |  35 ++++++
 .../kafka/copycat/sink/SinkTaskContext.java     |  19 +++
 .../copycat/runtime/SinkTaskContextImpl.java    |  24 ----
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  35 +++++-
 .../copycat/runtime/WorkerSinkTaskTest.java     | 118 ++++++++++++++++++-
 5 files changed, 201 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
new file mode 100644
index 0000000..6f9f233
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * Indicates that a method has been invoked illegally or at an invalid time by a connector or task.
+ */
+public class IllegalWorkerStateException extends CopycatException {
+    public IllegalWorkerStateException(String s) {
+        super(s);
+    }
+
+    public IllegalWorkerStateException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public IllegalWorkerStateException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
index 67c045f..3ecff27 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Context passed to SinkTasks, allowing them to access utilities in the copycat runtime.
@@ -56,4 +57,22 @@ public abstract class SinkTaskContext {
     public Map<TopicPartition, Long> offsets() {
         return offsets;
     }
+
+    /**
+     * Get the current set of assigned TopicPartitions for this task.
+     * @return the set of currently assigned TopicPartitions
+     */
+    public abstract Set<TopicPartition> assignment();
+
+    /**
+     * Pause consumption of messages from the specified TopicPartitions.
+     * @param partitions the partitions which should be paused
+     */
+    public abstract void pause(TopicPartition... partitions);
+
+    /**
+     * Resume consumption of messages from previously paused TopicPartitions.
+     * @param partitions the partitions to resume
+     */
+    public abstract void resume(TopicPartition... partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
deleted file mode 100644
index f47c984..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.copycat.sink.SinkTaskContext;
-
-class SinkTaskContextImpl extends SinkTaskContext {
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index cbda201..edb415a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.sink.SinkTaskContext;
@@ -59,8 +60,8 @@ class WorkerSinkTask implements WorkerTask {
         this.workerConfig = workerConfig;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
-        context = new SinkTaskContextImpl();
         this.time = time;
+        this.context = new WorkerSinkTaskContext();
     }
 
     @Override
@@ -234,4 +235,36 @@ class WorkerSinkTask implements WorkerTask {
             }
         }
     }
+
+
+    private class WorkerSinkTaskContext extends SinkTaskContext {
+        @Override
+        public Set<TopicPartition> assignment() {
+            if (consumer == null)
+                throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized");
+            return consumer.assignment();
+        }
+
+        @Override
+        public void pause(TopicPartition... partitions) {
+            if (consumer == null)
+                throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
+            try {
+                consumer.pause(partitions);
+            } catch (IllegalStateException e) {
+                throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
+            }
+        }
+
+        @Override
+        public void resume(TopicPartition... partitions) {
+            if (consumer == null)
+                throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
+            try {
+                consumer.resume(partitions);
+            } catch (IllegalStateException e) {
+                throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23f9afb7/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 687ed8f..e4d1d8e 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.sink.SinkTaskContext;
@@ -40,9 +41,15 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WorkerSinkTask.class)
@@ -53,6 +60,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     // with mix of integer/string in Copycat
     private static final String TOPIC = "test";
     private static final int PARTITION = 12;
+    private static final int PARTITION2 = 13;
+    private static final int PARTITION3 = 14;
     private static final long FIRST_OFFSET = 45;
     private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
     private static final int KEY = 12;
@@ -62,10 +71,14 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     private static final byte[] RAW_VALUE = "value".getBytes();
 
     private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
+    private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
+    private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
+    private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private Time time;
     @Mock private SinkTask sinkTask;
+    private Capture<SinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
     @Mock private Converter keyConverter;
     @Mock
@@ -266,9 +279,79 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
-    private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps)
-            throws Exception {
-        sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
+    @Test
+    public void testAssignmentPauseResume() throws Exception {
+        // Just validate that the calls are passed through to the consumer, and that where appropriate errors are
+        // converted
+
+        Properties taskProps = new Properties();
+
+        expectInitializeTask(taskProps);
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)),
+                        sinkTaskContext.getValue().assignment());
+                return null;
+            }
+        });
+        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                try {
+                    sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
+                    fail("Trying to pause unassigned partition should have thrown an Copycat exception");
+                } catch (CopycatException e) {
+                    // expected
+                }
+                sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+                return null;
+            }
+        });
+        consumer.pause(UNASSIGNED_TOPIC_PARTITION);
+        PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+        consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+        PowerMock.expectLastCall();
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                try {
+                    sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
+                    fail("Trying to resume unassigned partition should have thrown an Copycat exception");
+                } catch (CopycatException e) {
+                    // expected
+                }
+
+                sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+                return null;
+            }
+        });
+        consumer.resume(UNASSIGNED_TOPIC_PARTITION);
+        PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+        consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+        PowerMock.expectLastCall();
+
+        expectStopTask(0);
+
+        PowerMock.replayAll();
+
+        workerTask.start(taskProps);
+        workerThread.iteration();
+        workerThread.iteration();
+        workerThread.iteration();
+        workerTask.stop();
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+
+    private void expectInitializeTask(Properties taskProps) throws Exception {
+        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
         PowerMock.expectLastCall();
         sinkTask.start(taskProps);
         PowerMock.expectLastCall();
@@ -282,7 +365,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
                 .andReturn(workerThread);
         workerThread.start();
         PowerMock.expectLastCall();
-        return consumer;
     }
 
     private void expectStopTask(final long expectedMessages) throws Exception {
@@ -328,6 +410,32 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         return capturedRecords;
     }
 
+    private IExpectationSetters<Object> expectOnePoll() {
+        // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
+        // returning empty data, we return one record. The expectation is that the data will be ignored by the
+        // response behavior specified using the return value of this method.
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+                        // "Sleep" so time will progress
+                        time.sleep(1L);
+                        ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+                                Collections.singletonMap(
+                                        new TopicPartition(TOPIC, PARTITION),
+                                        Arrays.asList(
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+                                        )));
+                        recordsReturned++;
+                        return records;
+                    }
+                });
+        EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+        sinkTask.put(EasyMock.anyObject(Collection.class));
+        return EasyMock.expectLastCall();
+    }
+
     private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
                                                               final RuntimeException flushError,
                                                               final Exception consumerCommitError,