You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/10 05:43:22 UTC

kafka git commit: KAFKA-2786: Only respond to SinkTask onPartitionsRevoked after the WorkerSinkTask has finished starting up.

Repository: kafka
Updated Branches:
  refs/heads/trunk bce664b42 -> 590a4616a


KAFKA-2786: Only respond to SinkTask onPartitionsRevoked after the WorkerSinkTask has finished starting up.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira

Closes #476 from ewencp/kafka-2786-on-partitions-assigned-only-after-start


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/590a4616
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/590a4616
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/590a4616

Branch: refs/heads/trunk
Commit: 590a4616a1030b3175ba5a548b5f1e1b49c323c2
Parents: bce664b
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Mon Nov 9 20:43:07 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Nov 9 20:43:07 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/connect/runtime/WorkerSinkTask.java  | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/590a4616/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index a4d4093..643b10e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -191,7 +191,7 @@ class WorkerSinkTask implements WorkerTask {
         try {
             task.flush(offsets);
         } catch (Throwable t) {
-            log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t);
+            log.error("Commit of {} offsets failed due to exception while flushing:", this, t);
             log.error("Rewinding offsets to last committed offsets");
             for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
                 log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
@@ -288,7 +288,7 @@ class WorkerSinkTask implements WorkerTask {
                 pausedForRedelivery = false;
             }
         } catch (RetriableException e) {
-            log.error("RetriableException from SinkTask {}: {}", id, e);
+            log.error("RetriableException from SinkTask {}:", id, e);
             // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
             // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
             pausedForRedelivery = true;
@@ -361,8 +361,10 @@ class WorkerSinkTask implements WorkerTask {
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            task.onPartitionsRevoked(partitions);
-            commitOffsets(true, -1);
+            if (started) {
+                task.onPartitionsRevoked(partitions);
+                commitOffsets(true, -1);
+            }
             // Make sure we don't have any leftover data since offsets will be reset to committed positions
             messageBatch.clear();
         }