You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/06 18:36:59 UTC

[kafka] branch 2.6 updated: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 919dbb0  KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
919dbb0 is described below

commit 919dbb0f6c41e818442b1c0a1abba60a54a08e75
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Oct 6 14:18:54 2020 -0400

    KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
---
 .../kafka/connect/runtime/WorkerSinkTask.java      |  7 +++
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 50 ++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 11318fd..976ecfd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -94,6 +94,7 @@ class WorkerSinkTask extends WorkerTask {
     private int commitFailures;
     private boolean pausedForRedelivery;
     private boolean committing;
+    private boolean taskStopped;
     private final WorkerErrantRecordReporter workerErrantRecordReporter;
 
     public WorkerSinkTask(ConnectorTaskId id,
@@ -138,6 +139,7 @@ class WorkerSinkTask extends WorkerTask {
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;
         this.workerErrantRecordReporter = workerErrantRecordReporter;
     }
 
@@ -168,6 +170,7 @@ class WorkerSinkTask extends WorkerTask {
         } catch (Throwable t) {
             log.warn("Could not stop task", t);
         }
+        taskStopped = true;
         Utils.closeQuietly(consumer, "consumer");
         Utils.closeQuietly(transformationChain, "transformation chain");
         Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
@@ -689,6 +692,10 @@ class WorkerSinkTask extends WorkerTask {
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            if (taskStopped) {
+                log.trace("Skipping partition revocation callback as task has already been stopped");
+                return;
+            }
             log.debug("{} Partitions revoked", WorkerSinkTask.this);
             try {
                 closePartitions();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 12a553f..3ae9b56 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -316,6 +316,56 @@ public class WorkerSinkTaskTest {
     }
 
     @Test
+    public void testShutdown() throws Exception {
+        createTask(initialState);
+
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // first iteration
+        expectPollInitialAssignment();
+
+        // second iteration
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        // WorkerSinkTask::stop
+        consumer.wakeup();
+        PowerMock.expectLastCall();
+        sinkTask.stop();
+        PowerMock.expectLastCall();
+
+        // WorkerSinkTask::close
+        consumer.close();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                rebalanceListener.getValue().onPartitionsRevoked(
+                    asList(TOPIC_PARTITION, TOPIC_PARTITION2)
+                );
+                return null;
+            }
+        });
+        transformationChain.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration();
+        sinkTaskContext.getValue().requestCommit(); // Force an offset commit
+        workerTask.iteration();
+        workerTask.stop();
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testPollRedelivery() throws Exception {
         createTask(initialState);