You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/06 18:58:10 UTC

[kafka] branch 2.5 updated: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 47c5179  KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
47c5179 is described below

commit 47c517931cc7433bfad0eb1c4d2f800766c02fae
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Oct 6 14:18:54 2020 -0400

    KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
---
 .../kafka/connect/runtime/WorkerSinkTask.java      | 15 ++++---
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 50 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 5db4030..54b19b6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -93,6 +93,7 @@ class WorkerSinkTask extends WorkerTask {
     private int commitFailures;
     private boolean pausedForRedelivery;
     private boolean committing;
+    private boolean taskStopped;
 
     public WorkerSinkTask(ConnectorTaskId id,
                           SinkTask task,
@@ -135,6 +136,7 @@ class WorkerSinkTask extends WorkerTask {
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;
     }
 
     @Override
@@ -164,13 +166,8 @@ class WorkerSinkTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not stop task", t);
         }
-        if (consumer != null) {
-            try {
-                consumer.close();
-            } catch (Throwable t) {
-                log.warn("Could not close consumer", t);
-            }
-        }
+        taskStopped = true;
+        Utils.closeQuietly(consumer, "consumer");
         try {
             transformationChain.close();
         } catch (Throwable t) {
@@ -672,6 +669,10 @@ class WorkerSinkTask extends WorkerTask {
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            if (taskStopped) {
+                log.trace("Skipping partition revocation callback as task has already been stopped");
+                return;
+            }
             log.debug("{} Partitions revoked", WorkerSinkTask.this);
             try {
                 closePartitions();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 5dc2f44..98cfa7e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -316,6 +316,56 @@ public class WorkerSinkTaskTest {
     }
 
     @Test
+    public void testShutdown() throws Exception {
+        createTask(initialState);
+
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // first iteration
+        expectPollInitialAssignment();
+
+        // second iteration
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        // WorkerSinkTask::stop
+        consumer.wakeup();
+        PowerMock.expectLastCall();
+        sinkTask.stop();
+        PowerMock.expectLastCall();
+
+        // WorkerSinkTask::close
+        consumer.close();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                rebalanceListener.getValue().onPartitionsRevoked(
+                    asList(TOPIC_PARTITION, TOPIC_PARTITION2)
+                );
+                return null;
+            }
+        });
+        transformationChain.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration();
+        sinkTaskContext.getValue().requestCommit(); // Force an offset commit
+        workerTask.iteration();
+        workerTask.stop();
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testPollRedelivery() throws Exception {
         createTask(initialState);