You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/17 18:45:03 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

rhauch commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r633778524



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+    public static final Logger log = LoggerFactory.getLogger(SynchronizationTest.class);
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    private String threadPrefix;
+    private Plugins plugins;
+    private ThreadPoolExecutor exec;
+    private Breakpoint<String> dclBreakpoint;
+    private Breakpoint<String> pclBreakpoint;
+
+    @Before
+    public void setup() {
+        TestPlugins.assertAvailable();
+        Map<String, String> pluginProps = Collections.singletonMap(
+            WorkerConfig.PLUGIN_PATH_CONFIG,
+            String.join(",", TestPlugins.pluginPath())
+        );
+        threadPrefix = SynchronizationTest.class.getSimpleName()
+            + "." + testName.getMethodName() + "-";
+        dclBreakpoint = new Breakpoint<>();
+        pclBreakpoint = new Breakpoint<>();
+        plugins = new Plugins(pluginProps) {
+            @Override
+            protected DelegatingClassLoader newDelegatingClassLoader(List<String> paths) {
+                return AccessController.doPrivileged(
+                    (PrivilegedAction<DelegatingClassLoader>) () ->
+                        new SynchronizedDelegatingClassLoader(paths)
+                );
+            }
+        };
+        exec = new ThreadPoolExecutor(
+            2,
+            2,
+            1000L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingDeque<>(),
+            threadFactoryWithNamedThreads(threadPrefix)
+        );
+
+    }
+
+    @After
+    public void tearDown() throws InterruptedException {
+        dclBreakpoint.clear();
+        pclBreakpoint.clear();
+        exec.shutdown();
+        exec.awaitTermination(1L, TimeUnit.SECONDS);
+    }
+
+    private static class Breakpoint<T> {
+
+        private Predicate<T> predicate;
+        private CyclicBarrier barrier;
+
+        public synchronized void clear() {
+            if (barrier != null) {
+                barrier.reset();
+            }
+            predicate = null;
+            barrier = null;
+        }
+
+        public synchronized void set(Predicate<T> predicate) {
+            clear();
+            this.predicate = predicate;
+            // As soon as the barrier is tripped, the barrier will be reset for the next round.
+            barrier = new CyclicBarrier(2);
+        }
+
+        /**
+         * From a thread under test, await for the test orchestrator to continue execution
+         * @param obj Object to test with the breakpoint's current predicate
+         */
+        public void await(T obj) {
+            Predicate<T> predicate;
+            CyclicBarrier barrier;
+            synchronized (this) {
+                predicate  = this.predicate;
+                barrier = this.barrier;
+            }
+            if (predicate != null && !predicate.test(obj)) {
+                return;
+            }
+            if (barrier != null) {
+                try {
+                    barrier.await();
+                } catch (InterruptedException | BrokenBarrierException e) {
+                    throw new RuntimeException("Interrupted while waiting for load gate", e);
+                }
+            }
+        }
+
+        /**
+         * From the test orchestrating thread, await for the test thread to continue execution
+         * @throws InterruptedException If the current thread is interrupted while waiting
+         * @throws BrokenBarrierException If the test thread is interrupted while waiting
+         * @throws TimeoutException If the barrier is not reached before 1s passes.
+         */
+        public void testAwait()
+            throws InterruptedException, BrokenBarrierException, TimeoutException {
+            CyclicBarrier barrier;
+            synchronized (this) {
+                barrier = this.barrier;
+            }
+            Objects.requireNonNull(barrier, "Barrier must be set up before awaiting");
+            barrier.await(1L, TimeUnit.SECONDS);
+        }
+    }
+
+    private class SynchronizedDelegatingClassLoader extends DelegatingClassLoader {
+        // NOTE: the DelegatingClassLoader is not parallel capable, and neither is this subclass.

Review comment:
       IIUC, this PR changes the `PluginClassLoader` to be parallel-capable, but changes `DelegatingClassLoader` to only be thread safe and non parallel-capable. Is that right?
   
   Can we modify the JavaDoc in `DelegatingClassLoader` and `PluginClassLoader` to identify whether or not we intend for them to be parallel-capable, linking to the relevant portion of the language guide?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -48,25 +48,25 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 public class DelegatingClassLoader extends URLClassLoader {
     private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
     private static final String CLASSPATH_NAME = "classpath";
     private static final String UNDEFINED_VERSION = "undefined";
 
-    private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
-    private final Map<String, String> aliases;
+    private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
+    private final ConcurrentMap<String, String> aliases;

Review comment:
       I understand why the implementations need to be `ConcurrentMap`, but IIUC we're not using any of the methods specific to `ConcurrentMap`. Is there a reason we don't want to keep `Map` for the type of these fields?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org