You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/03/15 22:58:04 UTC

[kafka] branch 1.1 updated: KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new ab0cbe7  KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused
ab0cbe7 is described below

commit ab0cbe7d2f7ee4db541221980e8f712fd42fd9de
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Thu Mar 15 15:52:53 2018 -0700

    KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused
    
    Changed WorkerSinkTaskContext to only resume the consumer topic partitions when the connector/task is not in the paused state.
    
    The context tracks the set of topic partitions that are explicitly paused/resumed by the connector, and when the WorkerSinkTask resumes the tasks it currently resumes all topic partitions *except* those that are still explicitly paused in the context. Therefore, the change above should result in the desired behavior.
    
    Several debug statements were added to record when the context is called by the connector.
    
    This can be backported to older releases, since this bug goes back to 0.10 or 0.9.
    
    Author: Randall Hauch <rh...@gmail.com>
    
    Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4716 from rhauch/kafka-6661
    
    (cherry picked from commit e7ef719a5bc0d1276f0e9482d59b25406fda276b)
    Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  4 +--
 .../connect/runtime/WorkerSinkTaskContext.java     | 39 ++++++++++++++++++----
 2 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2995a4e..2ba785c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -130,7 +130,7 @@ class WorkerSinkTask extends WorkerTask {
         try {
             this.taskConfig = taskConfig.originalsStrings();
             this.consumer = createConsumer();
-            this.context = new WorkerSinkTaskContext(consumer);
+            this.context = new WorkerSinkTaskContext(consumer, this);
         } catch (Throwable t) {
             log.error("{} Task failed initialization and will not be started.", this, t);
             onFailure(t);
@@ -601,7 +601,7 @@ class WorkerSinkTask extends WorkerTask {
     private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            log.debug("{} Partitions assigned", WorkerSinkTask.this);
+            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
             lastCommittedOffsets = new HashMap<>();
             currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 64c8fff..386f992 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -20,34 +20,43 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 public class WorkerSinkTaskContext implements SinkTaskContext {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private Map<TopicPartition, Long> offsets;
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
+    private final WorkerSinkTask sinkTask;
     private final Set<TopicPartition> pausedPartitions;
     private boolean commitRequested;
 
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, WorkerSinkTask sinkTask) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
+        this.sinkTask = sinkTask;
         this.pausedPartitions = new HashSet<>();
     }
 
     @Override
     public void offset(Map<TopicPartition, Long> offsets) {
+        log.debug("{} Setting offsets for topic partitions {}", this, offsets);
         this.offsets.putAll(offsets);
     }
 
     @Override
     public void offset(TopicPartition tp, long offset) {
+        log.debug("{} Setting offset for topic partition {} to {}", this, tp, offset);
         offsets.put(tp, offset);
     }
 
@@ -65,6 +74,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
 
     @Override
     public void timeout(long timeoutMs) {
+        log.debug("{} Setting timeout to {} ms", this, timeoutMs);
         this.timeoutMs = timeoutMs;
     }
 
@@ -90,9 +100,13 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
         }
         try {
-            for (TopicPartition partition : partitions)
-                pausedPartitions.add(partition);
-            consumer.pause(Arrays.asList(partitions));
+            Collections.addAll(pausedPartitions, partitions);
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not pausing consumer's partitions {}", this, partitions);
+            } else {
+                consumer.pause(Arrays.asList(partitions));
+                log.debug("{} Pausing partitions {}. Connector is not paused.", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
         }
@@ -104,9 +118,13 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
         }
         try {
-            for (TopicPartition partition : partitions)
-                pausedPartitions.remove(partition);
-            consumer.resume(Arrays.asList(partitions));
+            pausedPartitions.removeAll(Arrays.asList(partitions));
+            if (sinkTask.shouldPause()) {
+                log.debug("{} Connector is paused, so not resuming consumer's partitions {}", this, partitions);
+            } else {
+                consumer.resume(Arrays.asList(partitions));
+                log.debug("{} Resuming partitions: {}", this, partitions);
+            }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
         }
@@ -118,6 +136,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
 
     @Override
     public void requestCommit() {
+        log.debug("{} Requesting commit", this);
         commitRequested = true;
     }
 
@@ -129,4 +148,10 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         commitRequested = false;
     }
 
+    @Override
+    public String toString() {
+        return "WorkerSinkTaskContext{" +
+               "id=" + sinkTask.id +
+               '}';
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.