You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/17 17:13:07 UTC

kafka git commit: KAFKA-3225: Method commit() of class SourceTask never invoked

Repository: kafka
Updated Branches:
  refs/heads/trunk 3382b6db7 -> 8c90b1a98


KAFKA-3225: Method commit() of class SourceTask never invoked

1. Added a test case to prove commit() on SourceTask was not being called.
2. Added commitSourceTask() which logs potential exceptions.
3. Added after call to finishSuccessfulFlush().

Author: Jeremy Custenborder <je...@scarcemedia.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #909 from jcustenborder/KAFKA-3225


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c90b1a9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c90b1a9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c90b1a9

Branch: refs/heads/trunk
Commit: 8c90b1a98acb1222a0e907153bf52a85af0fe443
Parents: 3382b6d
Author: Jeremy Custenborder <je...@scarcemedia.com>
Authored: Wed Feb 17 11:12:41 2016 -0500
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Feb 17 11:12:41 2016 -0500

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSourceTask.java      | 15 +++++++++++++++
 .../kafka/connect/runtime/WorkerSourceTaskTest.java  |  2 ++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c90b1a9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 30c2262..562e03e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -290,6 +290,8 @@ class WorkerSourceTask extends WorkerTask {
                 finishSuccessfulFlush();
                 log.debug("Finished {} offset commitOffsets successfully in {} ms",
                         this, time.milliseconds() - started);
+
+                commitSourceTask();
                 return true;
             }
         }
@@ -334,9 +336,22 @@ class WorkerSourceTask extends WorkerTask {
         finishSuccessfulFlush();
         log.info("Finished {} commitOffsets successfully in {} ms",
                 this, time.milliseconds() - started);
+
+        commitSourceTask();
+
         return true;
     }
 
+    private void commitSourceTask() {
+        try {
+            this.task.commit();
+        } catch (InterruptedException ex) {
+            log.warn("Commit interrupted", ex);
+        } catch (Throwable ex) {
+            log.error("Exception thrown while calling task.commit()", ex);
+        }
+    }
+
     private synchronized void finishFailedFlush() {
         offsetWriter.cancelFlush();
         outstandingMessages.putAll(outstandingMessagesBacklog);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c90b1a9/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 1f557e4..59e9b86 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -158,6 +158,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         final CountDownLatch pollLatch = expectPolls(1);
         expectOffsetFlush(true);
 
+        sourceTask.commit();
+        EasyMock.expectLastCall();
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);