You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/04 13:24:59 UTC

[flink] branch master updated: [FLINK-25022][rest] Run jars in separate threads

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f212f2  [FLINK-25022][rest] Run jars in separate threads
1f212f2 is described below

commit 1f212f2ef04e36e0248098a26e7db43a6d65796a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sat Dec 4 14:24:26 2021 +0100

    [FLINK-25022][rest] Run jars in separate threads
    
    Use a dedicated thread to run each jar, so that pooled threads can't keep references to user-code (e.g., in a ThreadLocal).
---
 .../util/concurrent/SeparateThreadExecutor.java    |  39 +++++++
 .../runtime/webmonitor/WebSubmissionExtension.java |  57 ++++++++-
 .../webmonitor/handlers/JarMessageParameters.java  |   4 +-
 .../runtime/webmonitor/handlers/JarRunHandler.java |   4 +-
 .../webmonitor/handlers/JarUploadHandler.java      |   4 +-
 .../webmonitor/WebSubmissionExtensionTest.java     | 129 +++++++++++++++++++++
 6 files changed, 229 insertions(+), 8 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java
new file mode 100644
index 0000000..3d1ecba
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+
+/** An {@link Executor} that runs every runnable in a separate thread. */
+public final class SeparateThreadExecutor implements Executor {
+    private final ThreadFactory threadFactory;
+
+    public SeparateThreadExecutor(ThreadFactory threadFactory) {
+        this.threadFactory = Preconditions.checkNotNull(threadFactory);
+    }
+
+    @Override
+    public void execute(@Nonnull Runnable command) {
+        threadFactory.newThread(command).start();
+    }
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index 3d411c7..0829502 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.deployment.application.ApplicationRunner;
 import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -36,6 +38,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.SeparateThreadExecutor;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
@@ -45,6 +49,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Supplier;
 
 /** Container for the web submission handlers. */
 public class WebSubmissionExtension implements WebMonitorExtension {
@@ -52,6 +57,10 @@ public class WebSubmissionExtension implements WebMonitorExtension {
     private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>
             webSubmissionHandlers;
 
+    // for easier access during testing
+    private final JarUploadHandler jarUploadHandler;
+    private final JarRunHandler jarRunHandler;
+
     public WebSubmissionExtension(
             Configuration configuration,
             GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
@@ -61,10 +70,38 @@ public class WebSubmissionExtension implements WebMonitorExtension {
             Executor executor,
             Time timeout)
             throws Exception {
+        this(
+                configuration,
+                leaderRetriever,
+                responseHeaders,
+                localAddressFuture,
+                jarDir,
+                executor,
+                timeout,
+                () -> new DetachedApplicationRunner(true));
+    }
+
+    @VisibleForTesting
+    WebSubmissionExtension(
+            Configuration configuration,
+            GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+            Map<String, String> responseHeaders,
+            CompletableFuture<String> localAddressFuture,
+            Path jarDir,
+            Executor executor,
+            Time timeout,
+            Supplier<ApplicationRunner> applicationRunnerSupplier)
+            throws Exception {
 
         webSubmissionHandlers = new ArrayList<>();
 
-        final JarUploadHandler jarUploadHandler =
+        final Executor jarRunExecutor =
+                new SeparateThreadExecutor(
+                        new ExecutorThreadFactory.Builder()
+                                .setPoolName("flink-jar-runner")
+                                .build());
+
+        jarUploadHandler =
                 new JarUploadHandler(
                         leaderRetriever,
                         timeout,
@@ -84,7 +121,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
                         configuration,
                         executor);
 
-        final JarRunHandler jarRunHandler =
+        jarRunHandler =
                 new JarRunHandler(
                         leaderRetriever,
                         timeout,
@@ -92,8 +129,8 @@ public class WebSubmissionExtension implements WebMonitorExtension {
                         JarRunHeaders.getInstance(),
                         jarDir,
                         configuration,
-                        executor,
-                        () -> new DetachedApplicationRunner(true));
+                        jarRunExecutor,
+                        applicationRunnerSupplier);
 
         final JarDeleteHandler jarDeleteHandler =
                 new JarDeleteHandler(
@@ -112,7 +149,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
                         JarPlanGetHeaders.getInstance(),
                         jarDir,
                         configuration,
-                        executor);
+                        jarRunExecutor);
 
         final JarPlanHandler postJarPlanHandler =
                 new JarPlanHandler(
@@ -141,4 +178,14 @@ public class WebSubmissionExtension implements WebMonitorExtension {
     public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() {
         return webSubmissionHandlers;
     }
+
+    @VisibleForTesting
+    JarUploadHandler getJarUploadHandler() {
+        return jarUploadHandler;
+    }
+
+    @VisibleForTesting
+    JarRunHandler getJarRunHandler() {
+        return jarRunHandler;
+    }
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
index 6201d04..280387f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
@@ -29,7 +30,8 @@ import java.util.Collections;
 /** Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. */
 abstract class JarMessageParameters extends MessageParameters {
 
-    final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+    @VisibleForTesting
+    public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
 
     final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index f18046b..ad258d6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.deployment.application.ApplicationRunner;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
@@ -82,7 +83,8 @@ public class JarRunHandler
     }
 
     @Override
-    protected CompletableFuture<JarRunResponseBody> handleRequest(
+    @VisibleForTesting
+    public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final HandlerRequest<JarRunRequestBody> request,
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 31b3c77..d582858 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -68,7 +69,8 @@ public class JarUploadHandler
     }
 
     @Override
-    protected CompletableFuture<JarUploadResponseBody> handleRequest(
+    @VisibleForTesting
+    public CompletableFuture<JarUploadResponseBody> handleRequest(
             @Nonnull final HandlerRequest<EmptyRequestBody> request,
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java
new file mode 100644
index 0000000..3ba45a1
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.deployment.application.ApplicationRunner;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class WebSubmissionExtensionTest {
+
+    private static final String JAR_NAME = "output-test-program.jar";
+
+    @Test
+    void applicationsRunInSeparateThreads(@TempDir Path tempDir) throws Exception {
+        final Path uploadDir = Files.createDirectories(tempDir.resolve("uploadDir"));
+        // create a copy because the upload handler moves uploaded jars (because it assumes it to be
+        // a temporary file)
+        final Path jarFile =
+                Files.copy(
+                        Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME),
+                        tempDir.resolve("app.jar"));
+
+        final DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().build();
+
+        final ThreadCapturingApplicationRunner threadCapturingApplicationRunner =
+                new ThreadCapturingApplicationRunner();
+
+        final WebSubmissionExtension webSubmissionExtension =
+                new WebSubmissionExtension(
+                        new Configuration(),
+                        () -> CompletableFuture.completedFuture(dispatcherGateway),
+                        Collections.emptyMap(),
+                        new CompletableFuture<>(),
+                        uploadDir,
+                        Executors.directExecutor(),
+                        Time.of(5, TimeUnit.SECONDS),
+                        () -> threadCapturingApplicationRunner);
+
+        final String jarId = uploadJar(webSubmissionExtension, jarFile, dispatcherGateway);
+
+        final JarRunHandler jarRunHandler = webSubmissionExtension.getJarRunHandler();
+
+        final JarRunMessageParameters parameters = new JarRunMessageParameters();
+        parameters.jarIdPathParameter.resolve(jarId);
+        final HandlerRequest<JarRunRequestBody> runRequest =
+                HandlerRequest.create(new JarRunRequestBody(), parameters);
+
+        // run several applications in sequence, and verify that each thread is unique
+        int numApplications = 20;
+        for (int i = 0; i < numApplications; i++) {
+            jarRunHandler.handleRequest(runRequest, dispatcherGateway).get();
+        }
+        assertThat(threadCapturingApplicationRunner.getThreads().size()).isEqualTo(numApplications);
+    }
+
+    private static String uploadJar(
+            WebSubmissionExtension extension, Path jarFile, DispatcherGateway dispatcherGateway)
+            throws Exception {
+        final JarUploadHandler jarUploadHandler = extension.getJarUploadHandler();
+
+        final HandlerRequest<EmptyRequestBody> uploadRequest =
+                HandlerRequest.create(
+                        EmptyRequestBody.getInstance(),
+                        EmptyMessageParameters.getInstance(),
+                        Collections.singletonList(jarFile.toFile()));
+
+        return jarUploadHandler.handleRequest(uploadRequest, dispatcherGateway).get().getFilename();
+    }
+
+    private static class ThreadCapturingApplicationRunner implements ApplicationRunner {
+
+        private final Set<Thread> threads = Collections.newSetFromMap(new IdentityHashMap<>());
+
+        @Override
+        public List<JobID> run(
+                DispatcherGateway dispatcherGateway,
+                PackagedProgram program,
+                Configuration configuration) {
+            threads.add(Thread.currentThread());
+            return Collections.singletonList(new JobID());
+        }
+
+        public Collection<Thread> getThreads() {
+            return threads;
+        }
+    }
+}