You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/28 20:59:06 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120685793


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }

Review Comment:
   We can reduce duplication here by piggybacking off of the non-void `isolate` method:
   ```suggestion
           isolate(() -> {
               runnable.run();
               return null;
           });
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -419,8 +420,14 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State
             metricGroup.close();
 
             metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
-            metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
-            metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version());
+            metricGroup.addImmutableValueMetric(registry.connectorClass, connector.pluginClass().getName());
+            String version;
+            try {
+                version = connector.version();
+            } catch (Exception e) {
+                version = DelegatingClassLoader.UNDEFINED_VERSION;
+            }

Review Comment:
   This technically changes behavior, right? We go from failing the connector on startup to just assigning it an undefined version.
   
   I think this is probably fine (the benefits outweigh the costs, and I have a hard time imagining any use case that would favor failing here), just want to make sure I'm gauging the impact here correctly.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   I don't think it's worth it to maintain all of these variants. Strictly speaking, we only need `T isolate(Callable<V>)`, and everything else can be implemented on top of it by callers.
   
   For example, instead of:
   
   ```java
   public void initialize(ConnectorContext ctx) throws Exception {
       isolateV(delegate::initialize, ctx);
   }
   ```
   
   we can use:
   
   ```java
   public void initialize(ConnectorContext ctx) throws Exception {
       isolateV(() -> delegate.initialize(ctx));
   }
   ```
   
   
   And instead of:
   
   ```java
   public void stop() throws Exception {
       isolateV(delegate::stop);
   }
   ```
   
   we can use:
   
   ```java
   public void stop() throws Exception {
       isolate(() -> {
           delegate.stop();
           return null;
       });
   }
   ```
   
   Since it requires two lines instead of one, I think the second type of alternative (replacing `isolateV` with `isolate`) is probably too aggressive. But the rest seem fine and, to some degree, more readable, so I'd opt to stick with just `void isolateV(ThrowingRunnable)` and `V isolate(Callable<V>)`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();

Review Comment:
   Is this always correct? Don't we want to still use the `DelegatingClassLoader` as the context class loader when loading plugins that are present on the worker's classpath?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }
+
+    public interface ThrowingRunnable {
+        void run() throws Exception;
+    }
+
+    @Override
+    public int hashCode() {
+        try {
+            return isolate(delegate::hashCode);
+        } catch (Throwable e) {
+            throw new RuntimeException("unable to evaluate plugin hashCode", e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || this.getClass() != obj.getClass()) {
+            return false;
+        }

Review Comment:
   IMO we should distinguish between an `IsolatedPlugin` instance and the actual underlying plugin in `equals` and `hashCode`.
   
   So in these methods we should not just take into account the underlying plugin's methods, we should also take into account the `Plugins`, `ClassLoader`, and `PluginType` fields as well.
   
   This could be especially useful during testing if we want to make sure that an `IsolatedPlugin` instance has the expected class loader, for example.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   This is a change in behavior too, right? We no longer throw in `ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and distributed modes), but it also has consequences for the REST API, where restarting a connector no longer fails if we're unable to generate task configs for it (which is currently the case for both distributed and standalone modes).
   
   



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -220,7 +220,18 @@ public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
         return newPlugin(klass);
     }
 
-    public Connector newConnector(String connectorClassOrAlias) {
+    public IsolatedConnector<?> newConnector(String connectorClassOrAlias) {
+        Connector connector = newRawConnector(connectorClassOrAlias);
+        if (connector instanceof SourceConnector) {
+            return new IsolatedSourceConnector(this, (SourceConnector) connector);
+        } else if (connector instanceof SinkConnector) {
+            return new IsolatedSinkConnector(this, (SinkConnector) connector);
+        } else {
+            throw new IllegalArgumentException("taskClass must be either a SourceTask or a SinkTask");
+        }
+    }
+
+    Connector newRawConnector(String connectorClassOrAlias) {

Review Comment:
   Any reason not to make this `private`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -486,21 +488,43 @@ ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean
             Set<String> allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
 
             // do custom connector-specific validation
-            ConfigDef configDef = connector.config();
+            ConfigDef configDef;
+            try {
+                configDef = connector.config();
+            } catch (Throwable e) {
+                throw new ConnectException(
+                        String.format(
+                                "unable to evaluate %s.config()",
+                                connector.pluginClass().getName()
+                        ),
+                        e
+                );
+            }

Review Comment:
   Would it be cleaner to add a `throws Exception` clause to this variant of `validateConnectorConfig`? Right now it's only called from [one (non-testing) location](https://github.com/apache/kafka/blob/7322f4cd55dc08abdc6ccf51ed33f7f0d869dd0e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L382-L387), which already has the necessary try/catch wrapper.
   
   The message also doesn't seem terribly useful here, considering the method and class name are already going to be present in the stack trace for the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org