You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/10/06 18:58:10 UTC
[kafka] branch 2.5 updated: KAFKA-10188: Prevent
SinkTask::preCommit from being called after SinkTask::stop (#8910)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 47c5179 KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
47c5179 is described below
commit 47c517931cc7433bfad0eb1c4d2f800766c02fae
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Oct 6 14:18:54 2020 -0400
KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)
---
.../kafka/connect/runtime/WorkerSinkTask.java | 15 ++++---
.../kafka/connect/runtime/WorkerSinkTaskTest.java | 50 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 7 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 5db4030..54b19b6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -93,6 +93,7 @@ class WorkerSinkTask extends WorkerTask {
private int commitFailures;
private boolean pausedForRedelivery;
private boolean committing;
+ private boolean taskStopped;
public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
@@ -135,6 +136,7 @@ class WorkerSinkTask extends WorkerTask {
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+ this.taskStopped = false;
}
@Override
@@ -164,13 +166,8 @@ class WorkerSinkTask extends WorkerTask {
} catch (Throwable t) {
log.warn("Could not stop task", t);
}
- if (consumer != null) {
- try {
- consumer.close();
- } catch (Throwable t) {
- log.warn("Could not close consumer", t);
- }
- }
+ taskStopped = true;
+ Utils.closeQuietly(consumer, "consumer");
try {
transformationChain.close();
} catch (Throwable t) {
@@ -672,6 +669,10 @@ class WorkerSinkTask extends WorkerTask {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ if (taskStopped) {
+ log.trace("Skipping partition revocation callback as task has already been stopped");
+ return;
+ }
log.debug("{} Partitions revoked", WorkerSinkTask.this);
try {
closePartitions();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 5dc2f44..98cfa7e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -316,6 +316,56 @@ public class WorkerSinkTaskTest {
}
@Test
+ public void testShutdown() throws Exception {
+ createTask(initialState);
+
+ expectInitializeTask();
+ expectTaskGetTopic(true);
+
+ // first iteration
+ expectPollInitialAssignment();
+
+ // second iteration
+ EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ // WorkerSinkTask::stop
+ consumer.wakeup();
+ PowerMock.expectLastCall();
+ sinkTask.stop();
+ PowerMock.expectLastCall();
+
+ // WorkerSinkTask::close
+ consumer.close();
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ rebalanceListener.getValue().onPartitionsRevoked(
+ asList(TOPIC_PARTITION, TOPIC_PARTITION2)
+ );
+ return null;
+ }
+ });
+ transformationChain.close();
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.iteration();
+ sinkTaskContext.getValue().requestCommit(); // Force an offset commit
+ workerTask.iteration();
+ workerTask.stop();
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testPollRedelivery() throws Exception {
createTask(initialState);