You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/24 20:08:06 UTC

kafka git commit: MINOR: add useful debug log messages to KConnect source task execution

Repository: kafka
Updated Branches:
  refs/heads/trunk f98cb3aa2 -> 1bfaddae9


MINOR: add useful debug log messages to KConnect source task execution

Author: Gwen Shapira <cs...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #957 from gwenshap/source_worker_debug


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1bfaddae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1bfaddae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1bfaddae

Branch: refs/heads/trunk
Commit: 1bfaddae9c7d8b40801a28c4806f61e647107c93
Parents: f98cb3a
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed Feb 24 11:07:34 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Feb 24 11:07:34 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/connect/runtime/WorkerSourceTask.java | 5 ++++-
 .../org/apache/kafka/connect/storage/OffsetStorageWriter.java   | 1 +
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1bfaddae/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 8542f4c..0014be8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -139,10 +139,13 @@ class WorkerSourceTask extends WorkerTask {
             }
 
             while (!isStopping()) {
-                if (toSend == null)
+                if (toSend == null) {
+                    log.debug("Nothing to send to Kafka. Polling source for additional records");
                     toSend = task.poll();
+                }
                 if (toSend == null)
                     continue;
+                log.debug("About to send " + toSend.size() + " records to Kafka");
                 if (!sendRecords())
                     stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1bfaddae/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index 3d1e70b..45b6fad 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -161,6 +161,7 @@ public class OffsetStorageWriter {
 
         // And submit the data
         log.debug("Submitting {} entries to backing store", offsetsSerialized.size());
+        log.debug("The offsets are: " + toFlush.toString());
         return backingStore.set(offsetsSerialized, new Callback<Void>() {
             @Override
             public void onCompletion(Throwable error, Void result) {