You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/22 02:59:47 UTC

[GitHub] [kafka] C0urante opened a new pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

C0urante opened a new pull request #8910:
URL: https://github.com/apache/kafka/pull/8910


   [Jira](https://issues.apache.org/jira/browse/KAFKA-10188)
   
   The general lifecycle for a sink task is:
   
   1. Instantiate the `SinkTask` object
   2. Invoke `SinkTask::initialize`
   3. Invoke `SinkTask::start`
   4. While the task is still running:
   - Poll Kafka for records
   - Give those records to the task via `SinkTask::put`
   - Periodically commit offsets, which involves calling `SinkTask::preCommit` and committing the resulting map of `TopicPartition` to offset to Kafka
   5. Commit offsets a penultimate time (including the call to `SinkTask::preCommit)
   6. Invoke `SinkTask::stop`
   7. Close the consumer for the task
   8. Commit offsets a final time (also including the call to `SinkTask::preCommit`)
   
   This final offset commit happens indirectly: closing the consumer for a sink task causes the rebalance listener for that consumer to be triggered, and the rebalance listener the framework uses for its consumers performs an offset commit for the task when partitions are revoked.
   
   This is a bit of a problem because the framework calls `SinkTask::stop` before closing the consumer for the task. It's possible and even likely that tasks will have de-allocated resources necessary for their `preCommit` method and will fail unexpectedly at this point.
   
   Since the framework already [ensures that offsets are committed](https://github.com/apache/kafka/blob/199f375b546c201289d2b15084e0a95598093fe0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L194-L196) after the last call to `SinkTask::put`, it should be fine to remove this extra offset commit. There is still a chance that some data may be dropped in the case that a task performs completely asynchronous writes to Kafka and has written data between the pre-stop call to `SinkTask::preCommit` and the post-stop one, but there will be no loss of delivery guarantees provided by the framework, and this change will adhere to the publicly-stated API for sink tasks.
   
   A unit test is added that covers the internal `WorkerSinkTask::close` method and ensures that `SinkTask::preCommit` is not called during that method.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446496984



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;

Review comment:
       Could it be replaced by ```isStopping()```? It seems to me both flags are similar and we don't need to add more duplicate.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r447380292



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            if (taskStopped) {

Review comment:
       I wasn't aware that the onPartitionsRevoked was called by close on the same thread, good to know for the future.
   I'm fine with this as-is.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446586612



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;

Review comment:
       I originally considered this approach, but it won't work for cases where the task stops on its own due to failure instead of being stopped externally by the worker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r496239771



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;

Review comment:
       Shouldn't this be volatile?
   
   Yes, it's true that `WorkerSinkTask.close()` is always and only called from within the `WorkerTask.doRun()` after the tasks determines it will stop. However, the `onPartitionsRevoked(...)` method is called from the consumer thread, and making the field volatile is the only way to ensure that the consumer thread reads a non-cached value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r447162953



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;

Review comment:
       Given the complexity of the unit tests for the `WorkerSinkTask` class and the guarantees we get from the coverage of the existing tests, I'm not sure it's really worth the effort. The code path that's modified in this PR is agnostic about the cause of shutdown for the task and we won't really get any more coverage by simulating a shutdown triggered by an exception from the `SinkTask` instance instead of external request from the `Worker` instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446490310



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            if (taskStopped) {

Review comment:
       The callback gets invoked on the same thread as the one that `KafkaConsumer::close` is invoked on; `volatile` isn't strictly necessary. If you (or others) think it'd be good to include just in case that changes or this callback gets invoked after the task is stopped on a different thread (which afaik is not possible atom), I don't have any major objections to adding it. Just didn't want to add it unnecessarily as it might be misleading to people reading the code base down the road.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446379953



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            if (taskStopped) {

Review comment:
       I think this gets called by the consumer thread, which is different from the thread which calls `close()`. I think that it may be necessary to mark this variable as volatile.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r497049737



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;

Review comment:
       The [Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html) for the `ConsumerRebalanceLister` state that the callback "will only execute in the user thread as part of the `poll(long)` call"; I think we have a guarantee here that `onPartitionsRevoked` will be called on the same thread that sets `taskStopped` to `false`. A fun way to verify this is to view the exceptions that get thrown by this bug; the stack traces include these lines:
   
   ```
   	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:695)
   	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:312)
   	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:744)
   	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:976)
   	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:888)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2368)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2335)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2285)
   	at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:933)
   	at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:174)
   	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
   	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
   	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
   ```
   
   The only edge case I can think of might be with asynchronous offset commits, but fwict those don't trigger asynchronous rebalance listener callbacks (if they trigger rebalances or rebalance listener callbacks at all).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r496239771



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;

Review comment:
       Shouldn't this be volatile?
   
   Yes, it's true that `WorkerSinkTask.close()` is always and only called from within the `WorkerTask.doRun()` after the tasks determines it will stop. However, the `onPartitionsRevoked(...)` method is called from the consumer thread, and making this volatile is the only way to ensure that the consumer thread doesn't read a previously-cached value.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -315,6 +315,56 @@ public void testPause() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testShutdown() throws Exception {
+        createTask(initialState);
+
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // first iteration
+        expectPollInitialAssignment();
+
+        // second iteration
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        // WorkerSinkTask::stop
+        consumer.wakeup();
+        PowerMock.expectLastCall();
+        sinkTask.stop();
+        PowerMock.expectLastCall();
+
+        // WorkerSinkTask::close
+        consumer.close();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                rebalanceListener.getValue().onPartitionsRevoked(
+                    asList(TOPIC_PARTITION, TOPIC_PARTITION2)
+                );
+                return null;
+            }
+        });
+        transformationChain.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration();
+        sinkTaskContext.getValue().requestCommit(); // Force an offset commit
+        workerTask.iteration();
+        workerTask.stop();
+        workerTask.close();
+
+        PowerMock.verifyAll();

Review comment:
       Verified locally that this test fails when the additions to the `onPartitionsRevoked(...)` method above are removed locally. Nice work, @C0urante.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446490310



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            if (taskStopped) {

Review comment:
       The callback gets invoked on the same thread as the one that `KafkaConsumer::close` is invoked on, so `volatile` isn't strictly necessary. If you (or others) think it'd be good to include just in case that changes or this callback gets invoked after the task is stopped on a different thread (which afaik is not possible atom), I don't have any major objections to adding it. Just didn't want to add it unnecessarily as it might be misleading to people reading the code base down the road. LMKWYT




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #8910:
URL: https://github.com/apache/kafka/pull/8910


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446490310



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            if (taskStopped) {

Review comment:
       The callback gets invoked on the same thread as the one that `KafkaConsumer::close` is invoked on, so `volatile` isn't strictly necessary. If you (or others) think it'd be good to include just in case that changes or this callback gets invoked after the task is stopped on a different thread (which afaik is not possible atm), I don't have any major objections to adding it. Just didn't want to add it unnecessarily as it might be misleading to people reading the code base down the road. LMKWYT




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446600419



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
         this.consumer = consumer;
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.taskStopped = false;

Review comment:
       > it won't work for cases where the task stops on its own due to failure instead of being stopped externally by the worker.
   
   thanks for the explanation. The case you mentioned is that we don't call ```onFailure``` before closing task so ```isStopping``` still return true. Could you add unit test for that case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org