You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 21:33:00 UTC

[jira] [Commented] (KAFKA-3462) Allow SinkTasks to disable consumer offset commit

    [ https://issues.apache.org/jira/browse/KAFKA-3462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376257#comment-16376257 ] 

ASF GitHub Bot commented on KAFKA-3462:
---------------------------------------

hachikuji closed pull request #1139: KAFKA-3462: Allow SinkTasks to disable consumer offset commit
URL: https://github.com/apache/kafka/pull/1139
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
index 2202cae0276..6f29c8b4ca1 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -22,6 +22,7 @@
 
 import java.util.Map;
 import java.util.Set;
+import java.util.Collection;
 
 /**
  * Context passed to SinkTasks, allowing them to access utilities in the Kafka Connect runtime.
@@ -79,4 +80,27 @@
      * @param partitions the partitions to resume
      */
     void resume(TopicPartition... partitions);
+
+    /**
+     * Whether to disable consumer offset commit in the framework. SinkTasks should use this if they manage offsets
+     * in the sink data store rather than using Kafka consumer offsets.  For example, an HDFS connector might record
+     * offsets in HDFS to provide exactly once delivery. When the SinkTask is started or a rebalance occurs, the task
+     * would reload offsets from HDFS. In this case, disabling consumer offset commit will save some CPU cycles and
+     * network IOs. It also saves the cost of unnecessary data pre fetches from the committed offsets and later
+     * be discarded as the connector may rewind the offsets.
+     *
+     * As Kafka Connect invokes the {@link SinkTask#flush(Map)} during offset commit which flushes all records that
+     * have been {@link SinkTask#put(Collection)} for the specified topic partitions. Disabling offset commits in Kafka
+     * Connect has some implications to the connector implementations: {@link SinkTask}s are now required to manually
+     * call {@link SinkTask#flush(Map)} or implement the flush logic that in {@link SinkTask#put(Collection)} to ensure
+     * the data and offsets are successfully written to the destination system.
+     *
+     * In case of manual offset management, the connector needs to make sure that offsets are written to the destination
+     * system before rebalance and task stop. Also, the connector needs to make sure that offset are reset after rebalance
+     * and task restart. {@link SinkTask#close(Collection)} is invoked before consumer group rebalance and task stops.
+     * In {@link SinkTask#close(Collection)}, connectors need to write the offsets to the destination system. During
+     * restart and after rebalance, the connector needs to reset the offset using {@link #offset(Map)} by passing in
+     * the offset information from the destination system.
+     */
+    void disableOffsetCommit();
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index fbc2307c2b6..b825200fca4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -149,6 +149,15 @@ public void execute() {
     }
 
     protected void iteration() {
+        long timeoutMs = Integer.MAX_VALUE;
+        if (!context.offsetCommitDisabled()) {
+            timeoutMs = maybeCommitOffsets();
+        }
+        // And process messages
+        poll(timeoutMs);
+    }
+
+    private long maybeCommitOffsets() {
         long now = time.milliseconds();
 
         // Maybe commit
@@ -159,16 +168,14 @@ protected void iteration() {
 
         // Check for timed out commits
         long commitTimeout = commitStarted + workerConfig.getLong(
-                WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+            WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
         if (committing && now >= commitTimeout) {
             log.warn("Commit of {} offsets timed out", this);
             commitFailures++;
             committing = false;
         }
 
-        // And process messages
-        long timeoutMs = Math.max(nextCommit - now, 0);
-        poll(timeoutMs);
+        return Math.max(nextCommit - now, 0);
     }
 
     private void onCommitCompleted(Throwable error, long seqno) {
@@ -432,7 +439,9 @@ private void openPartitions(Collection<TopicPartition> partitions) {
     }
 
     private void closePartitions() {
-        commitOffsets(time.milliseconds(), true);
+        if (!context.offsetCommitDisabled()) {
+            commitOffsets(time.milliseconds(), true);
+        }
     }
 
     private class HandleRebalance implements ConsumerRebalanceListener {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index c762bddb340..635b52d18b2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -27,12 +27,14 @@
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
     private final Set<TopicPartition> pausedPartitions;
+    private boolean offsetCommitDisabled;
 
     public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
         this.pausedPartitions = new HashSet<>();
+        this.offsetCommitDisabled = false;
     }
 
     @Override
@@ -109,4 +111,13 @@ public void resume(TopicPartition... partitions) {
     public Set<TopicPartition> pausedPartitions() {
         return pausedPartitions;
     }
+
+    @Override
+    public void disableOffsetCommit() {
+        this.offsetCommitDisabled = true;
+    }
+
+    public boolean offsetCommitDisabled() {
+        return offsetCommitDisabled;
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 846ca9572cd..2f2ebb5913f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -64,7 +64,7 @@ public ConnectorTaskId id() {
 
     /**
      * Initialize the task for execution.
-     * @param props initial configuration
+     * @param taskConfig initial configuration
      */
     public abstract void initialize(TaskConfig taskConfig);
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index dbb3f8d4886..7abea99b64f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -401,6 +401,41 @@ public void testTimestampPropagation() throws Exception {
         PowerMock.verifyAll();
     }
 
+    public void testDisableOffsetCommits() throws Exception {
+        expectInitializeTaskDisableOffsetCommit();
+        expectPollInitialAssignment();
+
+        final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+
+        EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+
+        sinkTask.open(partitions);
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+            new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                @Override
+                public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+                    rebalanceListener.getValue().onPartitionsRevoked(partitions);
+                    rebalanceListener.getValue().onPartitionsAssigned(partitions);
+                    return ConsumerRecords.empty();
+                }
+            });
+
+        sinkTask.put(Collections.<SinkRecord>emptyList());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.poll(Long.MAX_VALUE);
+        workerTask.iteration();
+
+        PowerMock.verifyAll();
+    }
+
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
         consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
@@ -412,6 +447,25 @@ private void expectInitializeTask() throws Exception {
         PowerMock.expectLastCall();
     }
 
+    private void expectInitializeTaskDisableOffsetCommit() throws Exception {
+        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
+        consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
+        PowerMock.expectLastCall();
+
+        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+        PowerMock.expectLastCall().andAnswer(
+            new IAnswer<Void>() {
+                @Override
+                public Void answer() throws Throwable {
+                    sinkTaskContext.getValue().disableOffsetCommit();
+                    return null;
+                }
+            }
+        );
+        sinkTask.start(TASK_PROPS);
+        PowerMock.expectLastCall();
+    }
+
     private void expectRebalanceRevocationError(RuntimeException e) {
         final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Allow SinkTasks to disable consumer offset commit 
> --------------------------------------------------
>
>                 Key: KAFKA-3462
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3462
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.10.1.0
>            Reporter: Liquan Pei
>            Assignee: Liquan Pei
>            Priority: Minor
>             Fix For: 0.10.2.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  SinkTasks should be able to disable consumer offset commit if they manage offsets in the sink data store rather than using Kafka consumer offsets.  For example, an HDFS connector might record offsets in HDFS to provide exactly once delivery. When the SinkTask is started or a rebalance occurs, the task would reload offsets from HDFS. In this case, disabling consumer offset commit will save some CPU cycles and network IOs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)