You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/02/01 23:05:12 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

gharris1727 opened a new pull request, #13185:
URL: https://github.com/apache/kafka/pull/13185

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14670)
   This is the first part of the above ticket, applied only to SinkConnector and SourceConnector plugins.
   Additional PRs will cover the other plugins, as the refactor was too large to reasonably review at once.
   
   Design decisions:
   1. The `IsolatedPlugin<P>` class will be a common superclass for all plugin wrappers.
   2. The `IsolatedPlugin` superclass provides utility methods for subclasses to manage swapping the ThreadContextClassLoader for each call in a way that has minimal boilerplate.
   3. The `Isolated*` classes are intended to only be constructed within the plugin isolation infrastructure, and will all have package-local constructors.
   4. Testing runtime code that uses wrapped plugins will require mocking the wrappers, or instantiating a real Plugins class.
   5. Subclasses should define public methods which match the plugin class they are wrapping without being an explicit subclass. These methods should be marked with `throws Exception` to remind callers that they may throw arbitrary exceptions.
   
   Open questions/issues:
   1. The `hashCode`, `equals`, and `toString` methods are isolated, but do not have `throws Exception` as the Object class does not have these throws clauses. That means that calling code may not be forced to handle exceptions from these methods. We could change the methodNames, but the base Object methods would still be callable and would have a default implementation.
   2. The wrapper method signatures throw `Exception` and not `Throwable`. The distinction being that `Exception`s are considered by the Java Language to be reasonable to catch in an application, and `Throwable`s were not. I wasn't sure whether the Connect runtime should be forced to handle errors like OutOfMemoryError, LinkageError, etc, or just let them propagate and kill the calling thread.
   3. These wrappers do not enforce that the methods are not called on the herder thread, because I didn't come up with an elegant way to do so.
   4. I did a first-pass at propagating and handling the exceptions thrown by the connector classes, but I don't know if they are reasonable. Now that the exceptions are checked, the code enforces that exceptions are handled, but it is still up to us to determine the proper way to handle the exceptions.
   5. This PR does not remove existing loaderSwap calls that are currently ensuring isolation. Those can be moved/removed after all of this refactor lands, as it may still be necessary for the other plugins.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1122467555


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   Yes this is a change in behavior.
   
   There is precedent for throwing ConnectException from ConnectorContext::requestTaskReconfiguration, so perhaps wrapping this in a ConnectException and propagating it would be a better behavior. I can move this to HerderConnectorContext, except it would only be effective for the standalone herder.
   
   We can also see this as an opportunity to improve the StandaloneHerder by handling reconfigurations asynchronously and retry them in the background, rather than 500'ing the REST API or dropping the failure silently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   This is a change in behavior too, right? We no longer throw in `ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and distributed modes), but it does has consequences for the REST API, where restarting a connector no longer fails if we're unable to generate task configs for it (which is currently the case for both distributed and standalone modes).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123470094


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   I don't think it's especially likely for connectors to continually invoke `requestTaskReconfiguration` given the automatic retry logic in distributed mode, and as of https://github.com/apache/kafka/pull/13276, the impact of ongoing retries for that operation is drastically reduced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123318824


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   Ah, I misunderstood--I thought you'd found a counterexample that couldn't be covered by the minimal set of `isolate`/`isolateV` variants, instead of one that couldn't be covered by the expanded set of variants originally present in the PR. Was really scratching my head over that one!
   
   Thanks for the changes, this class LGTM now 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13185:
URL: https://github.com/apache/kafka/pull/13185#issuecomment-1709425526

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120887378


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   I would be really interested in knowing which case can't be covered here. If you can dig up the `Converter` counterexample, would you mind sharing it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120867064


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   To take it to the extreme, we would inline and eliminate each of the `isolate` calls entirely and put the try-with-resources swap in the methods themselves. That would eliminate all lambdas and only add one stack frame, but be super repetitive in the high 10s of methods that have to be isolated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   This is a change in behavior too, right? We no longer throw in `ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and distributed modes), but it does have consequences for the REST API, where restarting a connector no longer fails if we're unable to generate task configs for it (which is currently the case for both distributed and standalone modes).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120880473


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }
+
+    public interface ThrowingRunnable {
+        void run() throws Exception;
+    }
+
+    @Override
+    public int hashCode() {
+        try {
+            return isolate(delegate::hashCode);
+        } catch (Throwable e) {
+            throw new RuntimeException("unable to evaluate plugin hashCode", e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || this.getClass() != obj.getClass()) {
+            return false;
+        }

Review Comment:
   I think you're right. I could imagine a particularly silly `Connector::equals` method which always returns `true` and causes poor behavior in a hash map or other data structure which relies on the equals method.
   
   I also wonder if it is a good idea to call the delegate equals method, or whether we should use reference equality (this.delegate == other.delegate) and skip calling the equals method entirely.
   I don't believe that we're using these methods anywhere in the runtime, and I don't think that it's common for developers to override the equals/hashCode, so we get to choose what the most useful implementation of these methods is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120889645


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   If it comes down to not being able to completely satisfy all use cases with just the `isolate(Callable)` (and possibly `isolateV(ThrowingRunnable)` methods, then I think it could be alright to add more variants. But the additional stack frames, and even ugly class names, don't seem likely to be super important to users or plugin developers. In both cases (though probably more often the latter), aren't they more likely to look for the first stack frame that originates with their plugin class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1122473180


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   Perhaps we can also consider this a failure of the signature of Herder::requestTaskReconfiguration. The DistributedHerder makes this asynchronous, but provides no future or callback to confirm the progress of the request.
   Arguably StandaloneHerder is implementing the function signature correctly as a request that either succeeds or fails.
   
   It also makes me think that a connector which repeatedly calls requestTaskReconfiguration (and then always fails in generateTaskConfigs) could spam the herder with retried restart requests. This is such a messy situation that the old function signatures hid from us :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120886025


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }
+
+    public interface ThrowingRunnable {
+        void run() throws Exception;
+    }
+
+    @Override
+    public int hashCode() {
+        try {
+            return isolate(delegate::hashCode);
+        } catch (Throwable e) {
+            throw new RuntimeException("unable to evaluate plugin hashCode", e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || this.getClass() != obj.getClass()) {
+            return false;
+        }

Review Comment:
   Given that equality across `Connector` instances is likely not a developer priority, I think you're right regarding reference quality for delegates. Good point 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120864255


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   I think this is a question of readability, but it's important to consider the readability for operators of these connectors. With the current design that uses mostly method references, exceptions have the following stacktrace:
   ```
   org.apache.kafka.connect.errors.ConnectException:
   	at org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.lambda$testSinkConnector$0(IsolatedPluginTest.java:43)
           ...
   	at org.apache.kafka.connect.connector.Connector.initialize(Connector.java:57)
   	at org.apache.kafka.connect.runtime.isolation.IsolatedPlugin.isolateV(IsolatedPlugin.java:66)
   	at org.apache.kafka.connect.runtime.isolation.IsolatedConnector.initialize(IsolatedConnector.java:39)
   	at org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.testSinkConnector(IsolatedPluginTest.java:46)
   	...
   ```
   
   There's two stack frames associated with the isolation infrastructure that did not exist befor, and neither of them are anonymous lambdas (the `IsolatedPluginTest.lambda$testSinkConnector$0` is an anonymous lambda in my test, not part of the IsolatedPlugin).
   
   Compare that to the style of making everything into a callable:
   ```
   org.apache.kafka.connect.errors.ConnectException:
   	at org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.lambda$testSinkConnector$0(IsolatedPluginTest.java:43)
           ...
   	at org.apache.kafka.connect.connector.Connector.initialize(Connector.java:57)
   	at org.apache.kafka.connect.runtime.isolation.IsolatedConnector.lambda$initialize$0(IsolatedConnector.java:39)
   	at org.apache.kafka.connect.runtime.isolation.IsolatedPlugin.isolateV(IsolatedPlugin.java:60)
   	at org.apache.kafka.connect.runtime.isolation.IsolatedConnector.initialize(IsolatedConnector.java:39)
   	at org.apache.kafka.connect.runtime.isolation.IsolatedPluginTest.testSinkConnector(IsolatedPluginTest.java:46)
   	...
   ```
   
   This has one additional stacktrace for the lambda mapping from the function to a callable, and it has a bit of an ugly name.
   
   Also it doesn't appear in this PR, but I did find some plugin methods which cannot be reduced to one of these signatures, and did have to be reduced to a Callable (I think it was the converters?).
   So this strategy is not perfect, does not cover all of the cases, but does provide slightly cleaner stacktraces in the situations it does cover.
   I think i could go either way on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120897608


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   > I would be really interested in knowing which case can't be covered here. If you can dig up the Converter counterexample, would you mind sharing it?
   
   Here's a snippet from the IsolatedConverter, which is a few PRs in the future from now.
   These methods take 3 or 4 arguments, and there's no builtin java functional type that covers them (I guess they would be TriFunction and QuadFunction?)
   ```
       public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) throws Exception {
           return isolate(() -> delegate.fromConnectData(topic, headers, schema, value));
       }
   
       public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) throws Exception {
           return isolate(() -> delegate.toConnectData(topic, headers, value));
       }
   
       public byte[] fromConnectData(String topic, Schema schema, Object value) throws Exception {
           return isolate(() -> delegate.fromConnectData(topic, schema, value));
       }
       
       public SchemaAndValue toConnectData(String topic, byte[] value) throws Exception {
           return isolate(delegate::toConnectData, topic, value);
       }
   ```
   
   The 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120897608


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   > I would be really interested in knowing which case can't be covered here. If you can dig up the Converter counterexample, would you mind sharing it?
   
   Here's a snippet from the IsolatedConverter, which is a few PRs in the future from now.
   These methods take 3 or 4 arguments, and there's no builtin java functional type that covers them (I guess they would be TriFunction and QuadFunction?)
   ```
       public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) throws Exception {
           return isolate(() -> delegate.fromConnectData(topic, headers, schema, value));
       }
   
       public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) throws Exception {
           return isolate(() -> delegate.toConnectData(topic, headers, value));
       }
   
       public byte[] fromConnectData(String topic, Schema schema, Object value) throws Exception {
           return isolate(() -> delegate.fromConnectData(topic, schema, value));
       }
       
       public SchemaAndValue toConnectData(String topic, byte[] value) throws Exception {
           return isolate(delegate::toConnectData, topic, value);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120685793


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }

Review Comment:
   We can reduce duplication here by piggybacking off of the non-void `isolate` method:
   ```suggestion
           isolate(() -> {
               runnable.run();
               return null;
           });
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -419,8 +420,14 @@ public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State
             metricGroup.close();
 
             metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
-            metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
-            metricGroup.addImmutableValueMetric(registry.connectorVersion, connector.version());
+            metricGroup.addImmutableValueMetric(registry.connectorClass, connector.pluginClass().getName());
+            String version;
+            try {
+                version = connector.version();
+            } catch (Exception e) {
+                version = DelegatingClassLoader.UNDEFINED_VERSION;
+            }

Review Comment:
   This technically changes behavior, right? We go from failing the connector on startup to just assigning it an undefined version.
   
   I think this is probably fine (the benefits outweigh the costs, and I have a hard time imagining any use case that would favor failing here), just want to make sure I'm gauging the impact here correctly.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }

Review Comment:
   I don't think it's worth it to maintain all of these variants. Strictly speaking, we only need `T isolate(Callable<V>)`, and everything else can be implemented on top of it by callers.
   
   For example, instead of:
   
   ```java
   public void initialize(ConnectorContext ctx) throws Exception {
       isolateV(delegate::initialize, ctx);
   }
   ```
   
   we can use:
   
   ```java
   public void initialize(ConnectorContext ctx) throws Exception {
       isolateV(() -> delegate.initialize(ctx));
   }
   ```
   
   
   And instead of:
   
   ```java
   public void stop() throws Exception {
       isolateV(delegate::stop);
   }
   ```
   
   we can use:
   
   ```java
   public void stop() throws Exception {
       isolate(() -> {
           delegate.stop();
           return null;
       });
   }
   ```
   
   Since it requires two lines instead of one, I think the second type of alternative (replacing `isolateV` with `isolate`) is probably too aggressive. But the rest seem fine and, to some degree, more readable, so I'd opt to stick with just `void isolateV(ThrowingRunnable)` and `V isolate(Callable<V>)`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();

Review Comment:
   Is this always correct? Don't we want to still use the `DelegatingClassLoader` as the context class loader when loading plugins that are present on the worker's classpath?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin<P> {
+
+    private final Plugins plugins;
+    private final Class<?> pluginClass;
+    protected final P delegate;
+    private final ClassLoader classLoader;
+    private final PluginType type;
+
+    IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+        this.plugins = Objects.requireNonNull(plugins, "plugins must be non-null");
+        this.delegate = Objects.requireNonNull(delegate, "delegate plugin must be non-null");
+        this.pluginClass = delegate.getClass();
+        ClassLoader classLoader = pluginClass.getClassLoader();
+        this.classLoader = Objects.requireNonNull(classLoader, "delegate plugin must not be a boostrap class");
+        this.type = Objects.requireNonNull(type, "plugin type must be non-null");
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Class<? extends P> pluginClass() {
+        return (Class<? extends P>) pluginClass;
+    }
+
+    protected <V> V isolate(Callable<V> callable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return callable.call();
+        }
+    }
+
+    protected void isolateV(ThrowingRunnable runnable) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            runnable.run();
+        }
+    }
+
+    protected <T> void isolateV(Consumer<T> consumer, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t);
+        }
+    }
+
+    protected <T, U> void isolateV(BiConsumer<T, U> consumer, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            consumer.accept(t, u);
+        }
+    }
+
+    protected <T, R> R isolate(Function<T, R> function, T t) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t);
+        }
+    }
+
+    protected <T, U, R> R isolate(BiFunction<T, U, R> function, T t, U u) throws Exception {
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+            return function.apply(t, u);
+        }
+    }
+
+    public interface ThrowingRunnable {
+        void run() throws Exception;
+    }
+
+    @Override
+    public int hashCode() {
+        try {
+            return isolate(delegate::hashCode);
+        } catch (Throwable e) {
+            throw new RuntimeException("unable to evaluate plugin hashCode", e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || this.getClass() != obj.getClass()) {
+            return false;
+        }

Review Comment:
   IMO we should distinguish between an `IsolatedPlugin` instance and the actual underlying plugin in `equals` and `hashCode`.
   
   So in these methods we should not just take into account the underlying plugin's methods, we should also take into account the `Plugins`, `ClassLoader`, and `PluginType` fields as well.
   
   This could be especially useful during testing if we want to make sure that an `IsolatedPlugin` instance has the expected class loader, for example.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   This is a change in behavior too, right? We no longer throw in `ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and distributed modes), but it also has consequences for the REST API, where restarting a connector no longer fails if we're unable to generate task configs for it (which is currently the case for both distributed and standalone modes).
   
   



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -220,7 +220,18 @@ public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
         return newPlugin(klass);
     }
 
-    public Connector newConnector(String connectorClassOrAlias) {
+    public IsolatedConnector<?> newConnector(String connectorClassOrAlias) {
+        Connector connector = newRawConnector(connectorClassOrAlias);
+        if (connector instanceof SourceConnector) {
+            return new IsolatedSourceConnector(this, (SourceConnector) connector);
+        } else if (connector instanceof SinkConnector) {
+            return new IsolatedSinkConnector(this, (SinkConnector) connector);
+        } else {
+            throw new IllegalArgumentException("taskClass must be either a SourceTask or a SinkTask");
+        }
+    }
+
+    Connector newRawConnector(String connectorClassOrAlias) {

Review Comment:
   Any reason not to make this `private`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -486,21 +488,43 @@ ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean
             Set<String> allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
 
             // do custom connector-specific validation
-            ConfigDef configDef = connector.config();
+            ConfigDef configDef;
+            try {
+                configDef = connector.config();
+            } catch (Throwable e) {
+                throw new ConnectException(
+                        String.format(
+                                "unable to evaluate %s.config()",
+                                connector.pluginClass().getName()
+                        ),
+                        e
+                );
+            }

Review Comment:
   Would it be cleaner to add a `throws Exception` clause to this variant of `validateConnectorConfig`? Right now it's only called from [one (non-testing) location](https://github.com/apache/kafka/blob/7322f4cd55dc08abdc6ccf51ed33f7f0d869dd0e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L382-L387), which already has the necessary try/catch wrapper.
   
   The message also doesn't seem terribly useful here, considering the method and class name are already going to be present in the stack trace for the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123467255


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) {
             log.error("Task that requested reconfiguration does not exist: {}", connName);
             return;
         }
-        updateConnectorTasks(connName);
+        try {
+            updateConnectorTasks(connName);
+        } catch (Exception e) {
+            log.error("Unable to generate task configs for {}", connName, e);
+        }

Review Comment:
   Okay, a lot to unpack here!
   
   The more I think about it, the more I like the existing behavior for handling failures in task config generation. We automatically retry in distributed mode in order to absorb the risk of writing to the config topic or issuing a REST request to the leader, but since neither of those take place in standalone mode, it's fine to just throw the exception back to the caller (either a connector invoking `ConnectorContext::requestTaskReconfiguration`, or a REST API call to restart the connector) since the likeliest cause is a failed call to `Connector::taskConfigs` and automatic retries are less likely to be useful.
   
   I think we should basically just preserve existing behavior here, with the one exception of fixing how we handle failed calls to `requestTaskReconfiguration`  that occur during a call to `restartConnector`. Right now we don't handle any of those and, IIUC, just cause the REST request to time out after 90 seconds. Instead of timing out, we should return a 500 response in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org