You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/25 22:59:21 UTC
kafka git commit: MINOR: Connect status tracking API followup
Repository: kafka
Updated Branches:
refs/heads/trunk 5db2c99e1 -> b3b533171
MINOR: Connect status tracking API followup
Fixes from Ishiihara's review.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Liquan Pei, Gwen Shapira
Closes #974 from hachikuji/status-tracking-followup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3b53317
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3b53317
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3b53317
Branch: refs/heads/trunk
Commit: b3b533171caebdfaf9e7c0adc9e62ef0f7ab0ed3
Parents: 5db2c99
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Feb 25 13:59:17 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Feb 25 13:59:17 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/connect/runtime/Worker.java | 10 ++++++----
.../kafka/connect/storage/KafkaStatusBackingStore.java | 2 +-
2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3b53317/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 8e74fec..39b69a3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -156,8 +156,9 @@ public class Worker {
* Add a new connector.
* @param connConfig connector configuration
* @param ctx context for the connector
+ * @param statusListener listener for notifications of connector status changes
*/
- public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener lifecycleListener) {
+ public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener statusListener) {
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
@@ -167,7 +168,7 @@ public class Worker {
throw new ConnectException("Connector with name " + connName + " already exists");
final Connector connector = instantiateConnector(connClass);
- WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, lifecycleListener);
+ WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
workerConnector.initialize();
@@ -290,8 +291,9 @@ public class Worker {
* Add a new task.
* @param id Globally unique ID for this task.
* @param taskConfig the parsed task configuration
+ * @param statusListener listener for notifications of task status changes
*/
- public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener lifecycleListener) {
+ public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener statusListener) {
log.info("Creating task {}", id);
if (tasks.containsKey(id)) {
@@ -305,7 +307,7 @@ public class Worker {
final Task task = instantiateTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
- final WorkerTask workerTask = buildWorkerTask(id, task, lifecycleListener);
+ final WorkerTask workerTask = buildWorkerTask(id, task, statusListener);
// Start the task before adding modifying any state, any exceptions are caught higher up the
// call chain and there's no cleanup to do here
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3b53317/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 948a325..db7ccc7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -213,7 +213,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
if (exception instanceof RetriableException) {
- synchronized (this) {
+ synchronized (KafkaStatusBackingStore.this) {
if (entry.isDeleted()
|| status.generation() != generation
|| (safeWrite && !entry.canWrite(status, sequence)))