You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/30 00:23:29 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

kkonstantine commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r377915173



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -808,38 +813,45 @@ public ConnectMetrics metrics() {
         return metrics;
     }
 
-    public void setTargetState(String connName, TargetState state) {
+    public void setTargetState(String connName, TargetState state, Callback<TargetState> stateChangeCallback) {
         log.info("Setting connector {} state to {}", connName, state);
 
         WorkerConnector workerConnector = connectors.get(connName);
         if (workerConnector != null) {
             ClassLoader connectorLoader =
                     plugins.delegatingLoader().connectorLoader(workerConnector.connector());
-            transitionTo(workerConnector, state, connectorLoader);
+            executeStateTransition(
+                new Runnable() {

Review comment:
       how far is this intended to be backported? If it's not before AK 2.0 then it'd be better to use lambda notation here and avoid the Runnable declaration boilerplate. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
##########
@@ -16,30 +16,64 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * ConnectorContext for use with a Herder
  */
-public class HerderConnectorContext implements ConnectorContext {
+public class HerderConnectorContext implements CloseableConnectorContext {
+
+    private static final Logger log = LoggerFactory.getLogger(HerderConnectorContext.class);
 
     private final AbstractHerder herder;
     private final String connectorName;
+    private volatile boolean closed;
 
     public HerderConnectorContext(AbstractHerder herder, String connectorName) {
         this.herder = herder;
         this.connectorName = connectorName;
+        this.closed = false;
     }
 
     @Override
     public void requestTaskReconfiguration() {
+        synchronized (this) {

Review comment:
       why use locking if the variable is `volatile` and can change only to `true` during the lifetime of this object?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##########
@@ -191,32 +192,71 @@ public synchronized void putConnectorConfig(String connName,
                                                 boolean allowReplace,
                                                 final Callback<Created<ConnectorInfo>> callback) {
         try {
-            if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) {
+            validateConnectorConfig(config, new Callback<ConfigInfos>() {
+                @Override
+                public void onCompletion(Throwable error, ConfigInfos configInfos) {
+                    if (error != null) {
+                        callback.onCompletion(error, null);
+                        return;
+                    }
+
+                    requestExecutorService.submit(
+                        () -> putConnectorConfig(connName, config, allowReplace, callback, configInfos)
+                    );
+                }
+            });
+        } catch (Throwable t) {
+            callback.onCompletion(t, null);
+        }
+    }
+
+    private synchronized void putConnectorConfig(String connName,
+                                                 final Map<String, String> config,
+                                                 boolean allowReplace,
+                                                 final Callback<Created<ConnectorInfo>> callback,
+                                                 ConfigInfos configInfos) {
+        try {
+            if (maybeAddConfigErrors(configInfos, callback)) {
                 return;
             }
 
-            boolean created = false;
+            final boolean created;
             if (configState.contains(connName)) {
                 if (!allowReplace) {
                     callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
                     return;
                 }
-                worker.stopConnector(connName);
+                worker.stopAndAwaitConnector(connName);
+                created = false;
             } else {
                 created = true;
             }
 
             configBackingStore.putConnectorConfig(connName, config);
 
-            if (!startConnector(connName)) {
-                callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
-                return;
-            }
+            Callback<TargetState> onStart = new Callback<TargetState>() {
+                @Override
+                public void onCompletion(Throwable error, TargetState result) {
+                    if (error != null) {
+                        callback.onCompletion(error, null);
+                        return;
+                    }
+
+                    requestExecutorService.submit(new Runnable() {

Review comment:
       I don't think this is worth backporting before AK 2.0, and maybe not even as far back. Given that I'd suggest using  lambda notation whenever a new Runnable is needed to avoid the Runnable declaration boilerplate.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
##########
@@ -266,31 +422,51 @@ public void close() {
         @Override
         public void onStartup(String connector) {
             state = AbstractStatus.State.RUNNING;
-            delegate.onStartup(connector);
+            synchronized (this) {

Review comment:
       same question here and below about locking around a `volatile` variable. Is this the only reason to lock here? One would think so based on previous usage. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org