You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/07 17:18:50 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

yashmayya commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1042258502


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */
+    protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
+                                           ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                           RestServer restServer, RestClient restClient);
+
+    protected abstract T createConfig(Map<String, String> workerProps);
+
+    /**
+     * @param workerProps the worker properties map
+     * @param connectorPropsFiles zero or more connector property files for connectors that are to be created after
+     *                            Connect is successfully started
+     * @return a started instance of {@link Connect}
+     */
+    public Connect startConnect(Map<String, String> workerProps, String... connectorPropsFiles) {
+        log.info("Kafka Connect worker initializing ...");
+        long initStart = time.hiResClockMs();
+
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        T config = createConfig(workerProps);
+        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
+        RestClient restClient = new RestClient(config);
+
+        RestServer restServer = new RestServer(config, restClient);
+        restServer.initializeServer();
+
+        URI advertisedUrl = restServer.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+
+        Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
+
+        final Connect connect = new Connect(herder, restServer);
+        log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        try {
+            for (final String connectorPropsFile : connectorPropsFiles) {
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            connect.stop();
+            Exit.exit(3);
+        }

Review Comment:
   I did think about this one and I felt like it might be better to include it in the common base class in case https://issues.apache.org/jira/browse/KAFKA-3815 were to be implemented. I can move it back to the standalone specific implementation if you like, I guess it could always be refactored later if required.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   While I agree that they both do look pretty similar, I'm not sure there's a way to refactor them to re-use common code while still being clean / readable. Were you thinking something along the lines of a static helper method which takes an argument indicating whether the mode is standalone or distributed (perhaps even a `Class<>` instance)? Although this might be messy too in case we don't want to keep the connector creation login in the base `startConnect` method and need to move it back to `ConnectStandalone`'s startup logic (https://github.com/apache/kafka/pull/12947/files#r1039705222).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */

Review Comment:
   Fair enough, I've removed the doc comment. Most of the param descriptions as well as the method description itself seemed fairly redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org