You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/06 18:36:59 UTC
[kafka] branch 2.6 updated: KAFKA-10188: Prevent
SinkTask::preCommit from being called after SinkTask::stop (#8910)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 919dbb0 KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
919dbb0 is described below
commit 919dbb0f6c41e818442b1c0a1abba60a54a08e75
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Oct 6 14:18:54 2020 -0400
KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
---
.../kafka/connect/runtime/WorkerSinkTask.java | 7 +++
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 50 ++++++++++++++++++++++
2 files changed, 57 insertions(+)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 11318fd..976ecfd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -94,6 +94,7 @@ class WorkerSinkTask extends WorkerTask {
private int commitFailures;
private boolean pausedForRedelivery;
private boolean committing;
+ private boolean taskStopped;
private final WorkerErrantRecordReporter workerErrantRecordReporter;
public WorkerSinkTask(ConnectorTaskId id,
@@ -138,6 +139,7 @@ class WorkerSinkTask extends WorkerTask {
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+ this.taskStopped = false;
this.workerErrantRecordReporter = workerErrantRecordReporter;
}
@@ -168,6 +170,7 @@ class WorkerSinkTask extends WorkerTask {
} catch (Throwable t) {
log.warn("Could not stop task", t);
}
+ taskStopped = true;
Utils.closeQuietly(consumer, "consumer");
Utils.closeQuietly(transformationChain, "transformation chain");
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
@@ -689,6 +692,10 @@ class WorkerSinkTask extends WorkerTask {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ if (taskStopped) {
+ log.trace("Skipping partition revocation callback as task has already been stopped");
+ return;
+ }
log.debug("{} Partitions revoked", WorkerSinkTask.this);
try {
closePartitions();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 12a553f..3ae9b56 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -316,6 +316,56 @@ public class WorkerSinkTaskTest {
}
@Test
+ public void testShutdown() throws Exception {
+ createTask(initialState);
+
+ expectInitializeTask();
+ expectTaskGetTopic(true);
+
+ // first iteration
+ expectPollInitialAssignment();
+
+ // second iteration
+ EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ // WorkerSinkTask::stop
+ consumer.wakeup();
+ PowerMock.expectLastCall();
+ sinkTask.stop();
+ PowerMock.expectLastCall();
+
+ // WorkerSinkTask::close
+ consumer.close();
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ rebalanceListener.getValue().onPartitionsRevoked(
+ asList(TOPIC_PARTITION, TOPIC_PARTITION2)
+ );
+ return null;
+ }
+ });
+ transformationChain.close();
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.iteration();
+ sinkTaskContext.getValue().requestCommit(); // Force an offset commit
+ workerTask.iteration();
+ workerTask.stop();
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testPollRedelivery() throws Exception {
createTask(initialState);