You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/04 13:24:59 UTC
[flink] branch master updated: [FLINK-25022][rest] Run jars in separate threads
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1f212f2 [FLINK-25022][rest] Run jars in separate threads
1f212f2 is described below
commit 1f212f2ef04e36e0248098a26e7db43a6d65796a
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sat Dec 4 14:24:26 2021 +0100
[FLINK-25022][rest] Run jars in separate threads
Use a dedicated thread to run each jar, so that pooled threads can't keep references to user-code (e.g., in a ThreadLocal).
---
.../util/concurrent/SeparateThreadExecutor.java | 39 +++++++
.../runtime/webmonitor/WebSubmissionExtension.java | 57 ++++++++-
.../webmonitor/handlers/JarMessageParameters.java | 4 +-
.../runtime/webmonitor/handlers/JarRunHandler.java | 4 +-
.../webmonitor/handlers/JarUploadHandler.java | 4 +-
.../webmonitor/WebSubmissionExtensionTest.java | 129 +++++++++++++++++++++
6 files changed, 229 insertions(+), 8 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java
new file mode 100644
index 0000000..3d1ecba
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+
+/** An {@link Executor} that runs every runnable in a separate thread. */
+public final class SeparateThreadExecutor implements Executor {
+ private final ThreadFactory threadFactory;
+
+ public SeparateThreadExecutor(ThreadFactory threadFactory) {
+ this.threadFactory = Preconditions.checkNotNull(threadFactory);
+ }
+
+ @Override
+ public void execute(@Nonnull Runnable command) {
+ threadFactory.newThread(command).start();
+ }
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index 3d411c7..0829502 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.webmonitor;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -36,6 +38,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.SeparateThreadExecutor;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
@@ -45,6 +49,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.function.Supplier;
/** Container for the web submission handlers. */
public class WebSubmissionExtension implements WebMonitorExtension {
@@ -52,6 +57,10 @@ public class WebSubmissionExtension implements WebMonitorExtension {
private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>
webSubmissionHandlers;
+ // for easier access during testing
+ private final JarUploadHandler jarUploadHandler;
+ private final JarRunHandler jarRunHandler;
+
public WebSubmissionExtension(
Configuration configuration,
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
@@ -61,10 +70,38 @@ public class WebSubmissionExtension implements WebMonitorExtension {
Executor executor,
Time timeout)
throws Exception {
+ this(
+ configuration,
+ leaderRetriever,
+ responseHeaders,
+ localAddressFuture,
+ jarDir,
+ executor,
+ timeout,
+ () -> new DetachedApplicationRunner(true));
+ }
+
+ @VisibleForTesting
+ WebSubmissionExtension(
+ Configuration configuration,
+ GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+ Map<String, String> responseHeaders,
+ CompletableFuture<String> localAddressFuture,
+ Path jarDir,
+ Executor executor,
+ Time timeout,
+ Supplier<ApplicationRunner> applicationRunnerSupplier)
+ throws Exception {
webSubmissionHandlers = new ArrayList<>();
- final JarUploadHandler jarUploadHandler =
+ final Executor jarRunExecutor =
+ new SeparateThreadExecutor(
+ new ExecutorThreadFactory.Builder()
+ .setPoolName("flink-jar-runner")
+ .build());
+
+ jarUploadHandler =
new JarUploadHandler(
leaderRetriever,
timeout,
@@ -84,7 +121,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
configuration,
executor);
- final JarRunHandler jarRunHandler =
+ jarRunHandler =
new JarRunHandler(
leaderRetriever,
timeout,
@@ -92,8 +129,8 @@ public class WebSubmissionExtension implements WebMonitorExtension {
JarRunHeaders.getInstance(),
jarDir,
configuration,
- executor,
- () -> new DetachedApplicationRunner(true));
+ jarRunExecutor,
+ applicationRunnerSupplier);
final JarDeleteHandler jarDeleteHandler =
new JarDeleteHandler(
@@ -112,7 +149,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
JarPlanGetHeaders.getInstance(),
jarDir,
configuration,
- executor);
+ jarRunExecutor);
final JarPlanHandler postJarPlanHandler =
new JarPlanHandler(
@@ -141,4 +178,14 @@ public class WebSubmissionExtension implements WebMonitorExtension {
public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() {
return webSubmissionHandlers;
}
+
+ @VisibleForTesting
+ JarUploadHandler getJarUploadHandler() {
+ return jarUploadHandler;
+ }
+
+ @VisibleForTesting
+ JarRunHandler getJarRunHandler() {
+ return jarRunHandler;
+ }
}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
index 6201d04..280387f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
@@ -29,7 +30,8 @@ import java.util.Collections;
/** Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. */
abstract class JarMessageParameters extends MessageParameters {
- final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+ @VisibleForTesting
+ public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index f18046b..ad258d6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
@@ -82,7 +83,8 @@ public class JarRunHandler
}
@Override
- protected CompletableFuture<JarRunResponseBody> handleRequest(
+ @VisibleForTesting
+ public CompletableFuture<JarRunResponseBody> handleRequest(
@Nonnull final HandlerRequest<JarRunRequestBody> request,
@Nonnull final DispatcherGateway gateway)
throws RestHandlerException {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 31b3c77..d582858 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -68,7 +69,8 @@ public class JarUploadHandler
}
@Override
- protected CompletableFuture<JarUploadResponseBody> handleRequest(
+ @VisibleForTesting
+ public CompletableFuture<JarUploadResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody> request,
@Nonnull final RestfulGateway gateway)
throws RestHandlerException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java
new file mode 100644
index 0000000..3ba45a1
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.deployment.application.ApplicationRunner;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class WebSubmissionExtensionTest {
+
+ private static final String JAR_NAME = "output-test-program.jar";
+
+ @Test
+ void applicationsRunInSeparateThreads(@TempDir Path tempDir) throws Exception {
+ final Path uploadDir = Files.createDirectories(tempDir.resolve("uploadDir"));
+ // create a copy because the upload handler moves uploaded jars (because it assumes it to be
+ // a temporary file)
+ final Path jarFile =
+ Files.copy(
+ Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME),
+ tempDir.resolve("app.jar"));
+
+ final DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().build();
+
+ final ThreadCapturingApplicationRunner threadCapturingApplicationRunner =
+ new ThreadCapturingApplicationRunner();
+
+ final WebSubmissionExtension webSubmissionExtension =
+ new WebSubmissionExtension(
+ new Configuration(),
+ () -> CompletableFuture.completedFuture(dispatcherGateway),
+ Collections.emptyMap(),
+ new CompletableFuture<>(),
+ uploadDir,
+ Executors.directExecutor(),
+ Time.of(5, TimeUnit.SECONDS),
+ () -> threadCapturingApplicationRunner);
+
+ final String jarId = uploadJar(webSubmissionExtension, jarFile, dispatcherGateway);
+
+ final JarRunHandler jarRunHandler = webSubmissionExtension.getJarRunHandler();
+
+ final JarRunMessageParameters parameters = new JarRunMessageParameters();
+ parameters.jarIdPathParameter.resolve(jarId);
+ final HandlerRequest<JarRunRequestBody> runRequest =
+ HandlerRequest.create(new JarRunRequestBody(), parameters);
+
+ // run several applications in sequence, and verify that each thread is unique
+ int numApplications = 20;
+ for (int i = 0; i < numApplications; i++) {
+ jarRunHandler.handleRequest(runRequest, dispatcherGateway).get();
+ }
+ assertThat(threadCapturingApplicationRunner.getThreads().size()).isEqualTo(numApplications);
+ }
+
+ private static String uploadJar(
+ WebSubmissionExtension extension, Path jarFile, DispatcherGateway dispatcherGateway)
+ throws Exception {
+ final JarUploadHandler jarUploadHandler = extension.getJarUploadHandler();
+
+ final HandlerRequest<EmptyRequestBody> uploadRequest =
+ HandlerRequest.create(
+ EmptyRequestBody.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ Collections.singletonList(jarFile.toFile()));
+
+ return jarUploadHandler.handleRequest(uploadRequest, dispatcherGateway).get().getFilename();
+ }
+
+ private static class ThreadCapturingApplicationRunner implements ApplicationRunner {
+
+ private final Set<Thread> threads = Collections.newSetFromMap(new IdentityHashMap<>());
+
+ @Override
+ public List<JobID> run(
+ DispatcherGateway dispatcherGateway,
+ PackagedProgram program,
+ Configuration configuration) {
+ threads.add(Thread.currentThread());
+ return Collections.singletonList(new JobID());
+ }
+
+ public Collection<Thread> getThreads() {
+ return threads;
+ }
+ }
+}