You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/25 22:59:21 UTC

kafka git commit: MINOR: Connect status tracking API followup

Repository: kafka
Updated Branches:
  refs/heads/trunk 5db2c99e1 -> b3b533171


MINOR: Connect status tracking API followup

Fixes from Ishiihara's review.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Liquan Pei, Gwen Shapira

Closes #974 from hachikuji/status-tracking-followup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3b53317
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3b53317
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3b53317

Branch: refs/heads/trunk
Commit: b3b533171caebdfaf9e7c0adc9e62ef0f7ab0ed3
Parents: 5db2c99
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Feb 25 13:59:17 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Feb 25 13:59:17 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/connect/runtime/Worker.java     | 10 ++++++----
 .../kafka/connect/storage/KafkaStatusBackingStore.java    |  2 +-
 2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3b53317/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 8e74fec..39b69a3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -156,8 +156,9 @@ public class Worker {
      * Add a new connector.
      * @param connConfig connector configuration
      * @param ctx context for the connector
+     * @param statusListener listener for notifications of connector status changes
      */
-    public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener lifecycleListener) {
+    public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener statusListener) {
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
 
@@ -167,7 +168,7 @@ public class Worker {
             throw new ConnectException("Connector with name " + connName + " already exists");
 
         final Connector connector = instantiateConnector(connClass);
-        WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, lifecycleListener);
+        WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
 
         log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
         workerConnector.initialize();
@@ -290,8 +291,9 @@ public class Worker {
      * Add a new task.
      * @param id Globally unique ID for this task.
      * @param taskConfig the parsed task configuration
+     * @param statusListener listener for notifications of task status changes
      */
-    public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener lifecycleListener) {
+    public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener statusListener) {
         log.info("Creating task {}", id);
 
         if (tasks.containsKey(id)) {
@@ -305,7 +307,7 @@ public class Worker {
         final Task task = instantiateTask(taskClass);
         log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
-        final WorkerTask workerTask = buildWorkerTask(id, task, lifecycleListener);
+        final WorkerTask workerTask = buildWorkerTask(id, task, statusListener);
 
         // Start the task before adding modifying any state, any exceptions are caught higher up the
         // call chain and there's no cleanup to do here

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3b53317/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 948a325..db7ccc7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -213,7 +213,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
             public void onCompletion(RecordMetadata metadata, Exception exception) {
                 if (exception != null) {
                     if (exception instanceof RetriableException) {
-                        synchronized (this) {
+                        synchronized (KafkaStatusBackingStore.this) {
                             if (entry.isDeleted()
                                     || status.generation() != generation
                                     || (safeWrite && !entry.canWrite(status, sequence)))