You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/04 04:05:06 UTC
kafka git commit: MINOR: Clean up of SourceTaskOffsetCommiter
Repository: kafka
Updated Branches:
refs/heads/trunk 3a20ba305 -> 83cf38545
MINOR: Clean up of SourceTaskOffsetCommiter
Author: Liquan Pei <li...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1170 from Ishiihara/minor-cleanup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83cf3854
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83cf3854
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83cf3854
Branch: refs/heads/trunk
Commit: 83cf38545be4614bd1f6b1759ada851fb38d63b0
Parents: 3a20ba3
Author: Liquan Pei <li...@gmail.com>
Authored: Sun Apr 3 19:04:48 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sun Apr 3 19:04:48 2016 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/SourceTaskOffsetCommitter.java | 9 +++------
.../main/java/org/apache/kafka/connect/runtime/Worker.java | 2 +-
2 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/83cf3854/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index bee24e7..c7f869e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -17,7 +17,6 @@
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -45,13 +44,11 @@ import java.util.concurrent.TimeUnit;
class SourceTaskOffsetCommitter {
private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
- private Time time;
private WorkerConfig config;
private ScheduledExecutorService commitExecutorService = null;
- private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
+ private final HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
- SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
- this.time = time;
+ SourceTaskOffsetCommitter(WorkerConfig config) {
this.config = config;
commitExecutorService = Executors.newSingleThreadScheduledExecutor();
}
@@ -96,7 +93,7 @@ class SourceTaskOffsetCommitter {
}
}
- public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
+ private void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
final ScheduledCommitTask task;
synchronized (committers) {
task = committers.get(id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/83cf3854/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1a9ff11..e1a806a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -120,7 +120,7 @@ public class Worker {
producer = new KafkaProducer<>(producerProps);
offsetBackingStore.start();
- sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);
+ sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
log.info("Worker started");
}