You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/05 15:06:49 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1039714073


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   The main methods of both classes are almost entirely identical. Can we add that logic to the abstract class as well?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities

Review Comment:
   ```suggestion
    * Common initialization logic for Kafka Connect, intended for use by command line utilities
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */

Review Comment:
   I don't think this method has to be annotated with Javadocs, but if we're going to add them, best to add all the usual information, including a brief description of the method and information on all parameters (and whether they will or will not be null).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Connect initialization common logic that can be leveraged by concrete implementations of command line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final Time time = Time.SYSTEM;
+
+    /**
+     * @param config an instance of {@link WorkerConfig}
+     */
+    protected abstract Herder createHerder(T config, String workerId, Plugins plugins,
+                                           ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                                           RestServer restServer, RestClient restClient);
+
+    protected abstract T createConfig(Map<String, String> workerProps);
+
+    /**
+     * @param workerProps the worker properties map
+     * @param connectorPropsFiles zero or more connector property files for connectors that are to be created after
+     *                            Connect is successfully started
+     * @return a started instance of {@link Connect}
+     */
+    public Connect startConnect(Map<String, String> workerProps, String... connectorPropsFiles) {
+        log.info("Kafka Connect worker initializing ...");
+        long initStart = time.hiResClockMs();
+
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        T config = createConfig(workerProps);
+        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
+        RestClient restClient = new RestClient(config);
+
+        RestServer restServer = new RestServer(config, restClient);
+        restServer.initializeServer();
+
+        URI advertisedUrl = restServer.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
+                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+
+        Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);
+
+        final Connect connect = new Connect(herder, restServer);
+        log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        try {
+            for (final String connectorPropsFile : connectorPropsFiles) {
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            connect.stop();
+            Exit.exit(3);
+        }

Review Comment:
   I don't think this part should be in the abstract class. It's specific to standalone mode and it's a bit misleading to give both modes logic for instantiating connectors based on command-line properties files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org