You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/03 22:17:39 UTC

kafka git commit: KAFKA-2480: Add backoff timeout and support rewinds

Repository: kafka
Updated Branches:
  refs/heads/trunk edddc41b3 -> 5aa5f19d3


KAFKA-2480: Add backoff timeout and support rewinds

Author: Liquan Pei <li...@gmail.com>

Reviewers: Ewen Cheslack-Postava, Gwen Shapira

Closes #340 from Ishiihara/backoff


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5aa5f19d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5aa5f19d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5aa5f19d

Branch: refs/heads/trunk
Commit: 5aa5f19d38eda33f32e170e14bcd4fd0d2835fc0
Parents: edddc41
Author: Liquan Pei <li...@gmail.com>
Authored: Tue Nov 3 13:17:20 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Nov 3 13:17:20 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/copycat/sink/SinkTask.java | 22 ++++++
 .../kafka/copycat/sink/SinkTaskContext.java     | 38 +++++++---
 .../kafka/copycat/runtime/WorkerSinkTask.java   | 73 +++++++++++--------
 .../copycat/runtime/WorkerSinkTaskContext.java  | 77 ++++++++++++++++++++
 .../copycat/runtime/WorkerSinkTaskTest.java     | 59 +++++++++++++--
 5 files changed, 223 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5aa5f19d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index bf5d152..c6cd12f 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -62,4 +62,26 @@ public abstract class SinkTask implements Task {
      * @param offsets mapping of TopicPartition to committed offset
      */
     public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
+
+    /**
+     * The SinkTask use this method to create writers for newly assigned partitions in case of partition
+     * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask.
+     * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions.
+     * This method will be called after partition re-assignment completes and before the SinkTask starts
+     * fetching data.
+     * @param partitions The list of partitions that are now assigned to the task (may include
+     *                   partitions previously assigned to the task)
+     */
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+    }
+
+    /**
+     * The SinkTask use this method to close writers and commit offsets for partitions that are
+     * longer assigned to the SinkTask. This method will be called before a rebalance operation starts
+     * and after the SinkTask stops fetching data.
+     * @param partitions The list of partitions that were assigned to the consumer on the last
+     *                   rebalance
+     */
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aa5f19d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
index 3ecff27..399dcef 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -29,18 +29,18 @@ import java.util.Set;
  */
 @InterfaceStability.Unstable
 public abstract class SinkTaskContext {
-    private Map<TopicPartition, Long> offsets;
+    protected Map<TopicPartition, Long> offsets;
+    protected long timeoutMs = -1L;
 
     public SinkTaskContext() {
         offsets = new HashMap<>();
     }
 
     /**
-     * Reset the consumer offsets for the given topic partitions. SinkTasks should use this when they are started
-     * if they manage offsets in the sink data store rather than using Kafka consumer offsets. For example, an HDFS
-     * connector might record offsets in HDFS to provide exactly once delivery. When the SinkTask is started or
-     * a rebalance occurs, the task would reload offsets from HDFS and use this method to reset the consumer to those
-     * offsets.
+     * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets
+     * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record
+     * offsets in HDFS to provide exactly once delivery. When the SinkTask is started or a rebalance occurs, the task
+     * would reload offsets from HDFS and use this method to reset the consumer to those offsets.
      *
      * SinkTasks that do not manage their own offsets do not need to use this method.
      *
@@ -51,11 +51,29 @@ public abstract class SinkTaskContext {
     }
 
     /**
-     * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
-     * @return the map of offsets
+     * Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets
+     * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record
+     * offsets in HDFS to provide exactly once delivery. When the topic partition is recovered the task
+     * would reload offsets from HDFS and use this method to reset the consumer to the offset.
+     *
+     * SinkTasks that do not manage their own offsets do not need to use this method.
+     *
+     * @param tp the topic partition to reset offset.
+     * @param offset the offset to reset to.
+     */
+    public void offset(TopicPartition tp, long offset) {
+        offsets.put(tp, offset);
+    }
+
+    /**
+     * Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain
+     * operations after the timeout. SinkTasks may have certain operations on external systems that may need
+     * to retry in case of failures. For example, append a record to an HDFS file may fail due to temporary network
+     * issues. SinkTasks use this method to set how long to wait before retrying.
+     * @param timeoutMs the backoff timeout in milliseconds.
      */
-    public Map<TopicPartition, Long> offsets() {
-        return offsets;
+    public void timeout(long timeoutMs) {
+        this.timeoutMs = timeoutMs;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aa5f19d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 70b99d0..e9aa055 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -17,24 +17,34 @@
 
 package org.apache.kafka.copycat.runtime;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.sink.SinkTaskContext;
 import org.apache.kafka.copycat.storage.Converter;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -51,7 +61,7 @@ class WorkerSinkTask implements WorkerTask {
     private final Converter valueConverter;
     private WorkerSinkTaskThread workThread;
     private KafkaConsumer<byte[], byte[]> consumer;
-    private final SinkTaskContext context;
+    private WorkerSinkTaskContext context;
 
     public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
                           Converter keyConverter, Converter valueConverter, Time time) {
@@ -61,14 +71,14 @@ class WorkerSinkTask implements WorkerTask {
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
         this.time = time;
-        this.context = new WorkerSinkTaskContext();
     }
 
     @Override
     public void start(Properties props) {
+        consumer = createConsumer(props);
+        context = new WorkerSinkTaskContext(consumer);
         task.initialize(context);
         task.start(props);
-        consumer = createConsumer(props);
         workThread = createWorkerThread();
         workThread.start();
     }
@@ -108,6 +118,12 @@ class WorkerSinkTask implements WorkerTask {
     /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
     public void poll(long timeoutMs) {
         try {
+            rewind();
+            long retryTimeout = context.timeout();
+            if (retryTimeout > 0) {
+                timeoutMs = Math.min(timeoutMs, retryTimeout);
+                context.timeout(-1L);
+            }
             log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
             ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
             log.trace("{} polling returned {} messages", id, msgs.count());
@@ -189,7 +205,7 @@ class WorkerSinkTask implements WorkerTask {
         }
 
         log.debug("Task {} subscribing to topics {}", id, topics);
-        newConsumer.subscribe(Arrays.asList(topics));
+        newConsumer.subscribe(Arrays.asList(topics), new HandleRebalance());
 
         // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
         // enable exactly once delivery to that system).
@@ -236,35 +252,30 @@ class WorkerSinkTask implements WorkerTask {
         }
     }
 
-
-    private class WorkerSinkTaskContext extends SinkTaskContext {
-        @Override
-        public Set<TopicPartition> assignment() {
-            if (consumer == null)
-                throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized");
-            return consumer.assignment();
+    private void rewind() {
+        Map<TopicPartition, Long> offsets = context.offsets();
+        if (offsets.isEmpty()) {
+            return;
+        }
+        for (TopicPartition tp: offsets.keySet()) {
+            Long offset = offsets.get(tp);
+            if (offset != null) {
+                log.trace("Rewind {} to offset {}.", tp, offset);
+                consumer.seek(tp, offset);
+            }
         }
+        offsets.clear();
+    }
 
+    private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
-        public void pause(TopicPartition... partitions) {
-            if (consumer == null)
-                throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
-            try {
-                consumer.pause(partitions);
-            } catch (IllegalStateException e) {
-                throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
-            }
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            task.onPartitionsAssigned(partitions);
         }
 
         @Override
-        public void resume(TopicPartition... partitions) {
-            if (consumer == null)
-                throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
-            try {
-                consumer.resume(partitions);
-            } catch (IllegalStateException e) {
-                throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
-            }
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            task.onPartitionsRevoked(partitions);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aa5f19d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
new file mode 100644
index 0000000..b8d7d54
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
+import org.apache.kafka.copycat.sink.SinkTaskContext;
+
+import java.util.Map;
+import java.util.Set;
+
+public class WorkerSinkTaskContext extends SinkTaskContext {
+
+    KafkaConsumer<byte[], byte[]> consumer;
+
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+        this.consumer = consumer;
+    }
+
+    /**
+     * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework.
+     * @return the map of offsets
+     */
+    public Map<TopicPartition, Long> offsets() {
+        return offsets;
+    }
+
+    /**
+     * Get the timeout in milliseconds set by SinkTasks. Used by the Copycat framework.
+     * @return the backoff timeout in milliseconds.
+     */
+    public long timeout() {
+        return timeoutMs;
+    }
+
+    @Override
+    public Set<TopicPartition> assignment() {
+        if (consumer == null) {
+            throw new IllegalWorkerStateException("SinkTaskContext may not be used to look up partition assignment until the task is initialized");
+        }
+        return consumer.assignment();
+    }
+
+    @Override
+    public void pause(TopicPartition... partitions) {
+        if (consumer == null) {
+            throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
+        }
+        try {
+            consumer.pause(partitions);
+        } catch (IllegalStateException e) {
+            throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
+        }
+    }
+
+    @Override
+    public void resume(TopicPartition... partitions) {
+        if (consumer == null) {
+            throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
+        }
+        try {
+            consumer.resume(partitions);
+        } catch (IllegalStateException e) {
+            throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aa5f19d/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index e5e5b85..08707c9 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -17,21 +17,28 @@
 
 package org.apache.kafka.copycat.runtime;
 
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.sink.SinkTaskContext;
 import org.apache.kafka.copycat.storage.Converter;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.apache.kafka.copycat.util.MockTime;
 import org.apache.kafka.copycat.util.ThreadedTest;
-import org.easymock.*;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -44,6 +51,7 @@ import org.powermock.reflect.Whitebox;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
@@ -78,7 +86,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private Time time;
     @Mock private SinkTask sinkTask;
-    private Capture<SinkTaskContext> sinkTaskContext = EasyMock.newCapture();
+    private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
     @Mock private Converter keyConverter;
     @Mock
@@ -349,6 +357,47 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testRewind() throws Exception {
+        Properties taskProps = new Properties();
+        expectInitializeTask(taskProps);
+        final long startOffset = 40L;
+        final Map<TopicPartition, Long> offsets = new HashMap<>();
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                offsets.put(TOPIC_PARTITION, startOffset);
+                sinkTaskContext.getValue().offset(offsets);
+                return null;
+            }
+        });
+
+        consumer.seek(TOPIC_PARTITION, startOffset);
+        EasyMock.expectLastCall();
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets();
+                assertEquals(0, offsets.size());
+                return null;
+            }
+        });
+
+        expectStopTask(3);
+
+        PowerMock.replayAll();
+
+        workerTask.start(taskProps);
+        workerThread.iteration();
+        workerThread.iteration();
+        workerTask.stop();
+        // No need for awaitStop since the thread is mocked
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
 
     private void expectInitializeTask(Properties taskProps) throws Exception {
         sinkTask.initialize(EasyMock.capture(sinkTaskContext));