You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/10 05:43:22 UTC
kafka git commit: KAFKA-2786: Only respond to SinkTask
onPartitionsRevoked after the WorkerSinkTask has finished starting up.
Repository: kafka
Updated Branches:
refs/heads/trunk bce664b42 -> 590a4616a
KAFKA-2786: Only respond to SinkTask onPartitionsRevoked after the WorkerSinkTask has finished starting up.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #476 from ewencp/kafka-2786-on-partitions-assigned-only-after-start
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/590a4616
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/590a4616
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/590a4616
Branch: refs/heads/trunk
Commit: 590a4616a1030b3175ba5a548b5f1e1b49c323c2
Parents: bce664b
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Mon Nov 9 20:43:07 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Nov 9 20:43:07 2015 -0800
----------------------------------------------------------------------
.../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/590a4616/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index a4d4093..643b10e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -191,7 +191,7 @@ class WorkerSinkTask implements WorkerTask {
try {
task.flush(offsets);
} catch (Throwable t) {
- log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t);
+ log.error("Commit of {} offsets failed due to exception while flushing:", this, t);
log.error("Rewinding offsets to last committed offsets");
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
@@ -288,7 +288,7 @@ class WorkerSinkTask implements WorkerTask {
pausedForRedelivery = false;
}
} catch (RetriableException e) {
- log.error("RetriableException from SinkTask {}: {}", id, e);
+ log.error("RetriableException from SinkTask {}:", id, e);
// If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
// but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
pausedForRedelivery = true;
@@ -361,8 +361,10 @@ class WorkerSinkTask implements WorkerTask {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- task.onPartitionsRevoked(partitions);
- commitOffsets(true, -1);
+ if (started) {
+ task.onPartitionsRevoked(partitions);
+ commitOffsets(true, -1);
+ }
// Make sure we don't have any leftover data since offsets will be reset to committed positions
messageBatch.clear();
}