You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/15 22:32:33 UTC

kafka git commit: KAFKA-3260 - Added SourceTask.commitRecord

Repository: kafka
Updated Branches:
  refs/heads/trunk fd6efbe0b -> 6eacc0de3


KAFKA-3260 - Added SourceTask.commitRecord

Added commitRecord(SourceRecord record) to SourceTask. This method is called during the callback from producer.send() when the message has been sent successfully. Added commitTaskRecord(SourceRecord record) to WorkerSourceTask to handle calling commitRecord on the SourceTask. Updated tests for calls to commitRecord.

Author: Jeremy Custenborder <jc...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #950 from jcustenborder/KAFKA-3260


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6eacc0de
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6eacc0de
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6eacc0de

Branch: refs/heads/trunk
Commit: 6eacc0de303e4d29e083b89c1f53615c1dfa291e
Parents: fd6efbe
Author: Jeremy Custenborder <jc...@gmail.com>
Authored: Tue Mar 15 14:32:22 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Mar 15 14:32:22 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/connect/source/SourceTask.java    | 16 ++++++++++++++++
 .../kafka/connect/runtime/WorkerSourceTask.java    |  9 +++++++++
 .../connect/runtime/WorkerSourceTaskTest.java      | 17 +++++++++++++++++
 3 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6eacc0de/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 802fcdd..c508ec1 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -79,4 +79,20 @@ public abstract class SourceTask implements Task {
      * {@link java.nio.channels.Selector#wakeup() wakeup()} to interrupt any ongoing requests.
      */
     public abstract void stop();
+
+    /**
+     * <p>
+     * Commit an individual {@link SourceRecord} when the callback from the producer client is received.
+     * </p>
+     * <p>
+     * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
+     * automatically. This hook is provided for systems that also need to store offsets internally
+     * in their own system.
+     * </p>
+     * @param record {@link SourceRecord} that was successfully sent via the producer.
+     * @throws InterruptedException
+     */
+    public void commitRecord(SourceRecord record) throws InterruptedException {
+        // This space intentionally left blank.
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6eacc0de/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0014be8..3a43f96 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -207,6 +207,7 @@ class WorkerSourceTask extends WorkerTask {
                                     log.trace("Wrote record successfully: topic {} partition {} offset {}",
                                             recordMetadata.topic(), recordMetadata.partition(),
                                             recordMetadata.offset());
+                                    commitTaskRecord(record);
                                 }
                                 recordSent(producerRecord);
                             }
@@ -226,6 +227,14 @@ class WorkerSourceTask extends WorkerTask {
         return true;
     }
 
+    private void commitTaskRecord(SourceRecord record) {
+        try {
+            task.commitRecord(record);
+        } catch (InterruptedException e) {
+            log.error("Exception thrown", e);
+        }
+    }
+
     private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) {
         ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
         // While flushing, we may also see callbacks for items in the backlog

http://git-wip-us.apache.org/repos/asf/kafka/blob/6eacc0de/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 9b0133a..ece2985 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -133,6 +133,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         final CountDownLatch pollLatch = expectPolls(10);
         // In this test, we don't flush, so nothing goes any further than the offset writer
 
+        expectCommitRecord(10);
+
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
@@ -203,10 +205,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
+        
+        expectCommitRecord(1);
 
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -233,6 +238,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
+        expectCommitRecord(1);
         expectOffsetFlush(true);
 
         sourceTask.stop();
@@ -254,6 +260,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    private void expectCommitRecord(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
+            EasyMock.expectLastCall();
+        }
+    }
+
     @Test
     public void testSendRecordsConvertsData() throws Exception {
         createWorkerTask();
@@ -264,6 +277,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
 
+        expectCommitRecord(records.size());
+
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", records);
@@ -292,6 +307,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         expectSendRecordOnce(true);
         expectSendRecordOnce(false);
 
+        expectCommitRecord(3);
+
         PowerMock.replayAll();
 
         // Try to send 3, make first pass, second fail. Should save last two