You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/06 05:52:42 UTC

[GitHub] [kafka] gharris1727 commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

gharris1727 commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r420541836



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -246,15 +251,16 @@ public boolean startConnector(
             final WorkerConnector workerConnector;
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
+                final String connClass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);

Review comment:
       :+1:

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1182,28 +1251,47 @@ public Void call() throws Exception {
 
     // Helper for starting a connector with the given name, which will extract & parse the config, generate connector
     // context and add to the worker. This needs to be called from within the main worker thread for this herder.
-    private boolean startConnector(String connectorName) {
+    // The callback is invoked after the connector has finished startup and generated task configs, or failed in the process.
+    private void startConnector(String connectorName, Callback<Void> callback) {
         log.info("Starting connector {}", connectorName);
         final Map<String, String> configProps = configState.connectorConfig(connectorName);
-        final ConnectorContext ctx = new HerderConnectorContext(this, connectorName);
+        final CloseableConnectorContext ctx = new HerderConnectorContext(this, connectorName);
         final TargetState initialState = configState.targetState(connectorName);
-        boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);
-
-        // Immediately request configuration since this could be a brand new connector. However, also only update those
-        // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
-        // just restoring an existing connector.
-        if (started && initialState == TargetState.STARTED)
-            reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName);
+        final Callback<TargetState> onInitialStateChange = (error, newState) -> {
+            if (error != null) {
+                callback.onCompletion(new ConnectException("Failed to start connector: " + connectorName), null);
+                return;
+            }
 
-        return started;
+            // newState should be equal to initialState, but use it just in case
+            if (newState == TargetState.STARTED) {
+                addRequest(
+                    new Callable<Void>() {
+                        @Override
+                        public Void call() {
+                            // Request configuration since this could be a brand new connector. However, also only update those
+                            // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
+                            // just restoring an existing connector.
+                            reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName);
+                            callback.onCompletion(null, null);
+                            return null;
+                        }
+                    },
+                    forwardErrorCallback(callback)
+                );
+            } else {
+                callback.onCompletion(null, null);
+            }
+        };
+        worker.startConnector(connectorName, configProps, ctx, this, initialState, onInitialStateChange);
     }
 
     private Callable<Void> getConnectorStartingCallable(final String connectorName) {
         return new Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 try {
-                    startConnector(connectorName);
+                    startConnector(connectorName, (error, result) -> { });
                 } catch (Throwable t) {
                     log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +

Review comment:
       Is it possible to trigger this log statement now?
   Should this be moved into the callback?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
##########
@@ -166,27 +244,105 @@ private void pause() {
         }
     }
 
+    /**
+     * Stop this connector. This method does not block, it only triggers shutdown. Use
+     * #{@link #awaitShutdown} to block until completion.
+     */
     public void shutdown() {
+        synchronized (this) {
+            log.info("Scheduled shutdown for {}", this);
+            stopping = true;
+            notify();
+        }
+    }
+
+    void doShutdown() {
         try {
             if (state == State.STARTED)
                 connector.stop();
-            this.state = State.STOPPED;
+            WorkerConnector.this.state = State.STOPPED;

Review comment:
       nit: seems unnecessary?
   similar for `State.FAILED` below.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##########
@@ -82,6 +82,9 @@
     // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
     // but currently a worker simply leaving the group can take this long as well.
     public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+    // Mutable for integration testing; otherwise, some tests would take at least REQUEST_TIMEOUT_MS

Review comment:
       If only time was mocked.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##########
@@ -97,6 +100,15 @@ public ConnectorsResource(Herder herder, WorkerConfig config) {
         isTopicTrackingResetDisabled = !config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG);
     }
 
+    // For testing purposes only
+    public static void setRequestTimeout(long requestTimeoutMs) {

Review comment:
       Can this be knocked down to protected / package-private?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##########
@@ -191,32 +192,61 @@ public synchronized void putConnectorConfig(String connName,
                                                 boolean allowReplace,
                                                 final Callback<Created<ConnectorInfo>> callback) {
         try {
-            if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) {
+            validateConnectorConfig(config, (error, configInfos) -> {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
+
+                requestExecutorService.submit(
+                    () -> putConnectorConfig(connName, config, allowReplace, callback, configInfos)
+                );
+            });
+        } catch (Throwable t) {
+            callback.onCompletion(t, null);
+        }
+    }
+
+    private synchronized void putConnectorConfig(String connName,
+                                                 final Map<String, String> config,
+                                                 boolean allowReplace,
+                                                 final Callback<Created<ConnectorInfo>> callback,
+                                                 ConfigInfos configInfos) {
+        try {
+            if (maybeAddConfigErrors(configInfos, callback)) {
                 return;
             }
 
-            boolean created = false;
+            final boolean created;
             if (configState.contains(connName)) {
                 if (!allowReplace) {
                     callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
                     return;
                 }
-                worker.stopConnector(connName);
+                worker.stopAndAwaitConnector(connName);
+                created = false;
             } else {
                 created = true;
             }
 
             configBackingStore.putConnectorConfig(connName, config);
 
-            if (!startConnector(connName)) {
-                callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
-                return;
-            }
+            // startConnector(connName, onStart);
+            startConnector(connName, (error, result) -> {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
 
-            updateConnectorTasks(connName);
-            callback.onCompletion(null, new Created<>(created, createConnectorInfo(connName)));
-        } catch (ConnectException e) {
-            callback.onCompletion(e, null);
+                requestExecutorService.submit(() -> {
+                    synchronized (this) {

Review comment:
       All of the call sites for this function have synchronization blocks, can you just add the `synchronized` keyword to the method instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org