You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/04 04:05:06 UTC

kafka git commit: MINOR: Clean up of SourceTaskOffsetCommiter

Repository: kafka
Updated Branches:
  refs/heads/trunk 3a20ba305 -> 83cf38545


MINOR: Clean up of SourceTaskOffsetCommiter

Author: Liquan Pei <li...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1170 from Ishiihara/minor-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83cf3854
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83cf3854
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83cf3854

Branch: refs/heads/trunk
Commit: 83cf38545be4614bd1f6b1759ada851fb38d63b0
Parents: 3a20ba3
Author: Liquan Pei <li...@gmail.com>
Authored: Sun Apr 3 19:04:48 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun Apr 3 19:04:48 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/SourceTaskOffsetCommitter.java    | 9 +++------
 .../main/java/org/apache/kafka/connect/runtime/Worker.java  | 2 +-
 2 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/83cf3854/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index bee24e7..c7f869e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -45,13 +44,11 @@ import java.util.concurrent.TimeUnit;
 class SourceTaskOffsetCommitter {
     private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
 
-    private Time time;
     private WorkerConfig config;
     private ScheduledExecutorService commitExecutorService = null;
-    private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
+    private final HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
 
-    SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
-        this.time = time;
+    SourceTaskOffsetCommitter(WorkerConfig config) {
         this.config = config;
         commitExecutorService = Executors.newSingleThreadScheduledExecutor();
     }
@@ -96,7 +93,7 @@ class SourceTaskOffsetCommitter {
         }
     }
 
-    public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
+    private void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
         final ScheduledCommitTask task;
         synchronized (committers) {
             task = committers.get(id);

http://git-wip-us.apache.org/repos/asf/kafka/blob/83cf3854/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1a9ff11..e1a806a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -120,7 +120,7 @@ public class Worker {
         producer = new KafkaProducer<>(producerProps);
 
         offsetBackingStore.start();
-        sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);
+        sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
 
         log.info("Worker started");
     }