You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/03 17:45:12 UTC

[GitHub] [kafka] yashmayya opened a new pull request, #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

yashmayya opened a new pull request, #12947:
URL: https://github.com/apache/kafka/pull/12947

   - From the [JIRA ticket](https://issues.apache.org/jira/browse/KAFKA-6586):
   > The main methods in ConnectDistributed and ConnectStandalone have a lot of duplication, and it'd be good to refactor to centralize the logic. We can pull most of this logic into an abstract class that ConnectStandalone and ConnectDistributed both extend. At a glance, the differences between the two are different config and Herder implementations and some different initialization logic.
   
   - This refactor also allows for a straightforward implementation of https://issues.apache.org/jira/browse/KAFKA-3815 if that were to be pursued.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1044687594


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   I think if we're going to call this an abstract CLI, we should do the actual command-line argument processing in this class.
   
   One possible approach is to restructure the `AbstractConnectCli` class to have a protected constructor that accepts the command-line arguments:
   ```java
   abstract class AbstractConnectCli {
       private final String[] args;
   
       protected AbstractConnectCli(String[] args) {
           this.args = args;
       }
   }
   ```
   
   Expose a `run` method that processes those arguments, with some hook-in logic for subclasses to define custom usage info and handle extra command-line arguments:
   
   ```java
   protected abstract String usage();
   
   protected abstract void processExtraArgs(Herder herder, String[] extraArgs) throws Throwable;
   
   public void run() {
       if (args.length < 1 || Arrays.asList(args).contains("--help")) {
           log.info("Usage: {}", usage());
           Exit.exit(1);
       }
   
       try {
           String workerPropsFile = args[0];
           Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                   Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
           String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
           Connect connect = startConnect(workerProps, extraArgs);
   
           // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
           connect.awaitStop();
   
       } catch (Throwable t) {
           log.error("Stopping due to error", t);
           Exit.exit(2);
       }
   }
   ```
   
   And tweak `startConnect` to invoke the extra args hook after it's created and started the `Connect` instance:
   
   ```java
   // This part is unchanged; only added for context
   try {
       connect.start();
   } catch (Exception e) {
       log.error("Failed to start Connect", e);
       connect.stop();
       Exit.exit(3);
   }
   
   try {
       // This is the noteworthy change, which replaces the current logic for starting connectors based off of command-line properties files
       processExtraArgs(herder, extraArgs);
   } catch (Throwable t) {
       connect.stop();
       Exit.exit(3);
   }
   
   // This part is unchanged; only added for context
   return connect;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1048932674


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +36,75 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for ad hoc,
+ * small, or experimental jobs.
  * </p>
  * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Connector and task configs are stored in memory and are not persistent. However, connector offset data is persistent
+ * since it uses file storage (configurable via {@link StandaloneConfig#OFFSET_STORAGE_FILE_FILENAME_CONFIG})
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
-    public static void main(String[] args) {
+    protected ConnectStandalone(String... args) {
+        super(args);
+    }
 
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
-            log.info("Usage: ConnectStandalone worker.properties [connector1.properties connector2.properties ...]");
-            Exit.exit(1);
-        }
+    @Override
+    protected String usage() {
+        return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]";
+    }
 
+    @Override
+    protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
         try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect standalone worker initializing ...");
-            long initStart = time.hiResClockMs();
-            WorkerInfo initInfo = new WorkerInfo();
-            initInfo.logAll();
-
-            String workerPropsFile = args[0];
-            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment ...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            StandaloneConfig config = new StandaloneConfig(workerProps);
-
-            String kafkaClusterId = config.kafkaClusterId();
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
-            // Do not initialize a RestClient because the ConnectorsResource will not use it in standalone mode.
-            RestServer rest = new RestServer(config, null);
-            rest.initializeServer();
-
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+            for (final String connectorPropsFile : extraArgs) {
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping Connect due to an error while attempting to create a connector", t);

Review Comment:
   Can we please keep [the existing wording](https://github.com/apache/kafka/blob/de088a2e9758e36efe60b1d8acb18b4881b5a9fc/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L121) for this error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
yashmayya commented on PR #12947:
URL: https://github.com/apache/kafka/pull/12947#issuecomment-1336336173

   Hi @C0urante, would you be able to take a look at this whenever possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1047260096


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
-    public static void main(String[] args) {
-
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
-            log.info("Usage: ConnectStandalone worker.properties [connector1.properties connector2.properties ...]");
-            Exit.exit(1);
-        }
-
-        try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect standalone worker initializing ...");
-            long initStart = time.hiResClockMs();
-            WorkerInfo initInfo = new WorkerInfo();
-            initInfo.logAll();
-
-            String workerPropsFile = args[0];
-            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment ...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            StandaloneConfig config = new StandaloneConfig(workerProps);
-
-            String kafkaClusterId = config.kafkaClusterId();
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
+    protected ConnectStandalone(String... args) {
+        super(args);
+    }
 
-            // Do not initialize a RestClient because the ConnectorsResource will not use it in standalone mode.
-            RestServer rest = new RestServer(config, null);
-            rest.initializeServer();
+    @Override
+    protected String usage() {
+        return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]";
+    }
 
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+    @Override
+    protected void processExtraArgs(Herder herder, String[] extraArgs) throws Throwable {
+        for (final String connectorPropsFile : extraArgs) {
+            Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+            FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                if (error != null)
+                    log.error("Failed to create connector for {}", connectorPropsFile);
+                else
+                    log.info("Created connector {}", info.result().name());
+            });
+            herder.putConnectorConfig(
+                    connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                    connectorProps, false, cb);
+            cb.get();

Review Comment:
   Can we keep the [existing error message](https://github.com/apache/kafka/blob/526af63cfe57eccb93d9cb79542e87c16b669a15/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L121) if any of these fail?
   
   Can probably be accomplished by wrapping the body of this method in a try block and logging the error message, stopping the `Connect` instance, and exiting in the catch body.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.

Review Comment:
   I'd prefer to keep the existing wording of "ad hoc, small, or experimental jobs". I know it's generally recommended to run Connect in distributed mode but standalone mode is still a legitimate part of the project and we make that clear in [our docs](https://kafka.apache.org/33/documentation.html#connect_running):
   
   > In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   Agreed that the integration testing framework's use of the CLI class to instantiate workers is strange. IMO that part is no worse with these changes than it is currently though, so we don't have to block on cleaning that up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1042258502


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */
+    protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
+                                           ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                           RestServer restServer, RestClient restClient);
+
+    protected abstract T createConfig(Map<String, String> workerProps);
+
+    /**
+     * @param workerProps the worker properties map
+     * @param connectorPropsFiles zero or more connector property files for connectors that are to be created after
+     *                            Connect is successfully started
+     * @return a started instance of {@link Connect}
+     */
+    public Connect startConnect(Map<String, String> workerProps, String... connectorPropsFiles) {
+        log.info("Kafka Connect worker initializing ...");
+        long initStart = time.hiResClockMs();
+
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        T config = createConfig(workerProps);
+        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
+        RestClient restClient = new RestClient(config);
+
+        RestServer restServer = new RestServer(config, restClient);
+        restServer.initializeServer();
+
+        URI advertisedUrl = restServer.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+
+        Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
+
+        final Connect connect = new Connect(herder, restServer);
+        log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        try {
+            for (final String connectorPropsFile : connectorPropsFiles) {
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            connect.stop();
+            Exit.exit(3);
+        }

Review Comment:
   I did think about this one and I felt like it might be better to include it in the common base class in case https://issues.apache.org/jira/browse/KAFKA-3815 were to be implemented. I can move it back to the standalone specific implementation if you like, I guess it could always be refactored later if required.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   While I agree that they both do look pretty similar, I'm not sure there's a way to refactor them to re-use common code while still being clean / readable. Were you thinking something along the lines of a static helper method which takes an argument indicating whether the mode is standalone or distributed (perhaps even a `Class<>` instance)? Although this might be messy too in case we don't want to keep the connector creation login in the base `startConnect` method and need to move it back to `ConnectStandalone`'s startup logic (https://github.com/apache/kafka/pull/12947/files#r1039705222).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */

Review Comment:
   Fair enough, I've removed the doc comment. Most of the param descriptions as well as the method description itself seemed fairly redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1044687594


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   I think if we're going to call this an abstract CLI, we should do the actual command-line argument processing in this class.
   
   One possible approach is to restructure the `AbstractConnectCli` class to have a protected constructor that accepts the command-line arguments:
   ```java
   abstract class AbstractConnectCli {
       private final String[] args;
   
       protected AbstractConnectCli(String[] args) {
           this.args = args;
       }
   }
   ```
   
   Expose a `run` method that processes those arguments, with some hook-in logic for subclasses to define custom usage info and handle extra command-line arguments:
   
   ```java
   protected abstract String usage();
   
   protected abstract void processExtraArgs(Herder herder, String[] extraArgs) throws Throwable;
   
   public void run() {
       if (args.length < 1 || Arrays.asList(args).contains("--help")) {
           log.info("Usage: {}", usage());
           Exit.exit(1);
       }
   
       try {
           String workerPropsFile = args[0];
           Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                   Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
           String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
           Connect connect = startConnect(workerProps, extraArgs);
   
           // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
           connect.awaitStop();
   
       } catch (Throwable t) {
           log.error("Stopping due to error", t);
           Exit.exit(2);
       }
   }
   ```
   
   And tweak `startConnect` to invoke the extra args hook after it's created and started the `Connect` instance:
   
   ```java
   // This part is unchanged; only added for context
   try {
       connect.start();
   } catch (Exception e) {
       log.error("Failed to start Connect", e);
       connect.stop();
       Exit.exit(3);
   }
   
   try {
       // This is the noteworthy change, which replaces the current logic for starting connectors based off of command-line properties files
       processExtraArgs(herder, extraArgs);
   } catch (Throwable t) {
       connect.stop();
       Exit.exit(3);
   }
   
   // This part is unchanged; only added for context
   return connect;
   ```
   
   This may be complicated by usage in other places (like the integration testing framework), but hopefully you get the idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1047409620


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
-    public static void main(String[] args) {
-
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
-            log.info("Usage: ConnectStandalone worker.properties [connector1.properties connector2.properties ...]");
-            Exit.exit(1);
-        }
-
-        try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect standalone worker initializing ...");
-            long initStart = time.hiResClockMs();
-            WorkerInfo initInfo = new WorkerInfo();
-            initInfo.logAll();
-
-            String workerPropsFile = args[0];
-            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment ...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            StandaloneConfig config = new StandaloneConfig(workerProps);
-
-            String kafkaClusterId = config.kafkaClusterId();
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
+    protected ConnectStandalone(String... args) {
+        super(args);
+    }
 
-            // Do not initialize a RestClient because the ConnectorsResource will not use it in standalone mode.
-            RestServer rest = new RestServer(config, null);
-            rest.initializeServer();
+    @Override
+    protected String usage() {
+        return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]";
+    }
 
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+    @Override
+    protected void processExtraArgs(Herder herder, String[] extraArgs) throws Throwable {
+        for (final String connectorPropsFile : extraArgs) {
+            Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+            FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                if (error != null)
+                    log.error("Failed to create connector for {}", connectorPropsFile);
+                else
+                    log.info("Created connector {}", info.result().name());
+            });
+            herder.putConnectorConfig(
+                    connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                    connectorProps, false, cb);
+            cb.get();

Review Comment:
   Hm, this is already handled here - https://github.com/apache/kafka/blob/86df7c561029f44f7c2a1292f6eaba17e5d7d8d7/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java#L148-L154 Is your suggestion to move that handling from `AbstractConnectCli` to its implementations by also passing the `Connect` instance to `processExtraArgs` so that we can have more relevant error messages than `Stopping Connect due to an error while processing CLI arguments`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1048931944


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.

Review Comment:
   We'd definitely need a KIP for that; standalone mode is still part of the public interface for the project.
   
   I'm also not sure about the assertion that it's not used for production systems--it may not be supported for production use by companies that offer support or other services for Kafka, but AFAIK Apache doesn't have an official stance one way or the other beyond the recommendations we lay out in our docs, none of which go so far as to state that standalone mode is not intended for production use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1042468179


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   While I agree that they both do look pretty similar, I'm not sure there's a way to refactor them to re-use common code while still being clean / readable. Were you thinking something along the lines of a static helper method which takes an argument indicating whether the mode is standalone or distributed (perhaps even a `Class<>` instance)? Although this might be messy too in case we don't want to keep the connector creation logic in the base `startConnect` method and need to move it back to `ConnectStandalone`'s startup logic (https://github.com/apache/kafka/pull/12947/files#r1039705222).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12947:
URL: https://github.com/apache/kafka/pull/12947


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1047798678


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
-    public static void main(String[] args) {
-
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
-            log.info("Usage: ConnectStandalone worker.properties [connector1.properties connector2.properties ...]");
-            Exit.exit(1);
-        }
-
-        try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect standalone worker initializing ...");
-            long initStart = time.hiResClockMs();
-            WorkerInfo initInfo = new WorkerInfo();
-            initInfo.logAll();
-
-            String workerPropsFile = args[0];
-            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment ...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            StandaloneConfig config = new StandaloneConfig(workerProps);
-
-            String kafkaClusterId = config.kafkaClusterId();
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
+    protected ConnectStandalone(String... args) {
+        super(args);
+    }
 
-            // Do not initialize a RestClient because the ConnectorsResource will not use it in standalone mode.
-            RestServer rest = new RestServer(config, null);
-            rest.initializeServer();
+    @Override
+    protected String usage() {
+        return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]";
+    }
 
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+    @Override
+    protected void processExtraArgs(Herder herder, String[] extraArgs) throws Throwable {
+        for (final String connectorPropsFile : extraArgs) {
+            Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+            FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                if (error != null)
+                    log.error("Failed to create connector for {}", connectorPropsFile);
+                else
+                    log.info("Created connector {}", info.result().name());
+            });
+            herder.putConnectorConfig(
+                    connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                    connectorProps, false, cb);
+            cb.get();

Review Comment:
   Really, anything that preserves the error message and exit status should be fine. We could keep the call to `Connect::stop` in the catch block here if that's easier, I just don't want to change the message to something more vague unless without a good reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1047800524


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.

Review Comment:
   I don't know the history here, but if I had to guess, two possibilities come to mind (which are not mutually exclusive):
   - Offsets are persisted for source connectors in order to provide symmetry between source and sink (since sink connector offsets are stored by default as consumer offsets in both standalone and distributed mode, which causes them to persist beyond the lifetime of any single worker)
   - Standalone mode may originally not have supported the REST API, in which case, the assumption would be that the configs are already stored on disk somewhere in order to be passed to the worker on startup as filenames



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1048085888


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.

Review Comment:
   Thanks, that makes a lot of sense. Just curious, would adding a `FileConfigBackingStore` to the standalone mode require a KIP since we would be adding a new worker configuration for the file name (although we could default to falling back on the MemoryConfigBackingStore)? Or can we get away without one because it's the standalone mode which isn't used for production systems?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1047422514


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +34,65 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.

Review Comment:
   I've added it back 👍 
   
   Off topic, but I just realized that the standalone mode interestingly stores configs in-memory but offsets in a file by default. By any chance, are you aware of why this is the case - i.e. why aren't configs made persistent via file storage too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1044671346


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */
+    protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
+                                           ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                           RestServer restServer, RestClient restClient);
+
+    protected abstract T createConfig(Map<String, String> workerProps);
+
+    /**
+     * @param workerProps the worker properties map
+     * @param connectorPropsFiles zero or more connector property files for connectors that are to be created after
+     *                            Connect is successfully started
+     * @return a started instance of {@link Connect}
+     */
+    public Connect startConnect(Map<String, String> workerProps, String... connectorPropsFiles) {
+        log.info("Kafka Connect worker initializing ...");
+        long initStart = time.hiResClockMs();
+
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        T config = createConfig(workerProps);
+        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
+        RestClient restClient = new RestClient(config);
+
+        RestServer restServer = new RestServer(config, restClient);
+        restServer.initializeServer();
+
+        URI advertisedUrl = restServer.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+
+        Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
+
+        final Connect connect = new Connect(herder, restServer);
+        log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        try {
+            for (final String connectorPropsFile : connectorPropsFiles) {
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            connect.stop();
+            Exit.exit(3);
+        }

Review Comment:
   It's low-cost to implement this logic if/when we address KAFKA-3815; we don't have to add it here before then. It's also unclear whether an implementation of KAFKA-3815 would work with Java properties files or JSON files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1039714073


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   The main methods of both classes are almost entirely identical. Can we add that logic to the abstract class as well?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities

Review Comment:
   ```suggestion
    * Common initialization logic for Kafka Connect, intended for use by command line utilities
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */

Review Comment:
   I don't think this method has to be annotated with Javadocs, but if we're going to add them, best to add all the usual information, including a brief description of the method and information on all parameters (and whether they will or will not be null).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */
+    protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
+                                           ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                           RestServer restServer, RestClient restClient);
+
+    protected abstract T createConfig(Map<String, String> workerProps);
+
+    /**
+     * @param workerProps the worker properties map
+     * @param connectorPropsFiles zero or more connector property files for connectors that are to be created after
+     *                            Connect is successfully started
+     * @return a started instance of {@link Connect}
+     */
+    public Connect startConnect(Map<String, String> workerProps, String... connectorPropsFiles) {
+        log.info("Kafka Connect worker initializing ...");
+        long initStart = time.hiResClockMs();
+
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        T config = createConfig(workerProps);
+        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
+        RestClient restClient = new RestClient(config);
+
+        RestServer restServer = new RestServer(config, restClient);
+        restServer.initializeServer();
+
+        URI advertisedUrl = restServer.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+
+        Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
+
+        final Connect connect = new Connect(herder, restServer);
+        log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        try {
+            for (final String connectorPropsFile : connectorPropsFiles) {
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            connect.stop();
+            Exit.exit(3);
+        }

Review Comment:
   I don't think this part should be in the abstract class. It's specific to standalone mode and it's a bit misleading to give both modes logic for instantiating connectors based on command-line properties files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1045537542


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   Nice! This looks much cleaner and I've made the changes (largely the same with minor tweaks). The only small qualm that I have is that the integration test framework also uses `startConnect` and it's a bit odd as it's initializing a CLI class with no args (since it passes the worker properties map directly to `startConnect`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org