You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/17 17:13:07 UTC
kafka git commit: KAFKA-3225: Method commit() of class SourceTask
never invoked
Repository: kafka
Updated Branches:
refs/heads/trunk 3382b6db7 -> 8c90b1a98
KAFKA-3225: Method commit() of class SourceTask never invoked
1. Added a test case to prove commit() on SourceTask was not being called.
2. Added commitSourceTask() which logs potential exceptions.
3. Added after call to finishSuccessfulFlush().
Author: Jeremy Custenborder <je...@scarcemedia.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #909 from jcustenborder/KAFKA-3225
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c90b1a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c90b1a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c90b1a9
Branch: refs/heads/trunk
Commit: 8c90b1a98acb1222a0e907153bf52a85af0fe443
Parents: 3382b6d
Author: Jeremy Custenborder <je...@scarcemedia.com>
Authored: Wed Feb 17 11:12:41 2016 -0500
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Feb 17 11:12:41 2016 -0500
----------------------------------------------------------------------
.../kafka/connect/runtime/WorkerSourceTask.java | 15 +++++++++++++++
.../kafka/connect/runtime/WorkerSourceTaskTest.java | 2 ++
2 files changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c90b1a9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 30c2262..562e03e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -290,6 +290,8 @@ class WorkerSourceTask extends WorkerTask {
finishSuccessfulFlush();
log.debug("Finished {} offset commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
+
+ commitSourceTask();
return true;
}
}
@@ -334,9 +336,22 @@ class WorkerSourceTask extends WorkerTask {
finishSuccessfulFlush();
log.info("Finished {} commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
+
+ commitSourceTask();
+
return true;
}
+ private void commitSourceTask() {
+ try {
+ this.task.commit();
+ } catch (InterruptedException ex) {
+ log.warn("Commit interrupted", ex);
+ } catch (Throwable ex) {
+ log.error("Exception thrown while calling task.commit()", ex);
+ }
+ }
+
private synchronized void finishFailedFlush() {
offsetWriter.cancelFlush();
outstandingMessages.putAll(outstandingMessagesBacklog);
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c90b1a9/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 1f557e4..59e9b86 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -158,6 +158,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
final CountDownLatch pollLatch = expectPolls(1);
expectOffsetFlush(true);
+ sourceTask.commit();
+ EasyMock.expectLastCall();
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);