You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/12/16 13:22:03 UTC

[flink] branch release-1.12 updated (b6ae903 -> 9664c17)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b6ae903  [FLINK-24077][HBase/IT] use MiniClusterWithClientResource as @ClassRule.
     new fdd52a7  [FLINK-25022][rest] Run jars in separate threads
     new 9664c17  Update for 1.12.7

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/_config.yml                                   |   2 +-
 docs/dev/project-configuration.md                  |   2 +-
 docs/dev/project-configuration.zh.md               |   2 +-
 .../util/concurrent/SeparateThreadExecutor.java    |  25 ++--
 .../runtime/webmonitor/WebSubmissionExtension.java |  57 +++++++-
 .../webmonitor/handlers/JarMessageParameters.java  |   4 +-
 .../runtime/webmonitor/handlers/JarRunHandler.java |   4 +-
 .../webmonitor/handlers/JarUploadHandler.java      |   4 +-
 .../webmonitor/WebSubmissionExtensionTest.java     | 145 +++++++++++++++++++++
 9 files changed, 221 insertions(+), 24 deletions(-)
 copy flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionSnapshotContext.java => flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java (57%)
 create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java

[flink] 01/02: [FLINK-25022][rest] Run jars in separate threads

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fdd52a787260e2d4dd97473a74e7e45222dbd099
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sat Dec 4 14:24:26 2021 +0100

    [FLINK-25022][rest] Run jars in separate threads
    
    Use a dedicated thread to run each jar, so that pooled threads can't keep references to user-code (e.g., in a ThreadLocal).
---
 .../util/concurrent/SeparateThreadExecutor.java    |  39 ++++++
 .../runtime/webmonitor/WebSubmissionExtension.java |  57 +++++++-
 .../webmonitor/handlers/JarMessageParameters.java  |   4 +-
 .../runtime/webmonitor/handlers/JarRunHandler.java |   4 +-
 .../webmonitor/handlers/JarUploadHandler.java      |   4 +-
 .../webmonitor/WebSubmissionExtensionTest.java     | 145 +++++++++++++++++++++
 6 files changed, 245 insertions(+), 8 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java
new file mode 100644
index 0000000..3d1ecba
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/SeparateThreadExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+
+/** An {@link Executor} that runs every runnable in a separate thread. */
+public final class SeparateThreadExecutor implements Executor {
+    private final ThreadFactory threadFactory;
+
+    public SeparateThreadExecutor(ThreadFactory threadFactory) {
+        this.threadFactory = Preconditions.checkNotNull(threadFactory);
+    }
+
+    @Override
+    public void execute(@Nonnull Runnable command) {
+        threadFactory.newThread(command).start();
+    }
+}
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index 3d411c7..cb2ba62 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.deployment.application.ApplicationRunner;
 import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
@@ -36,6 +39,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.concurrent.SeparateThreadExecutor;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
@@ -45,6 +49,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Supplier;
 
 /** Container for the web submission handlers. */
 public class WebSubmissionExtension implements WebMonitorExtension {
@@ -52,6 +57,10 @@ public class WebSubmissionExtension implements WebMonitorExtension {
     private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>
             webSubmissionHandlers;
 
+    // for easier access during testing
+    private final JarUploadHandler jarUploadHandler;
+    private final JarRunHandler jarRunHandler;
+
     public WebSubmissionExtension(
             Configuration configuration,
             GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
@@ -61,10 +70,38 @@ public class WebSubmissionExtension implements WebMonitorExtension {
             Executor executor,
             Time timeout)
             throws Exception {
+        this(
+                configuration,
+                leaderRetriever,
+                responseHeaders,
+                localAddressFuture,
+                jarDir,
+                executor,
+                timeout,
+                () -> new DetachedApplicationRunner(true));
+    }
+
+    @VisibleForTesting
+    WebSubmissionExtension(
+            Configuration configuration,
+            GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+            Map<String, String> responseHeaders,
+            CompletableFuture<String> localAddressFuture,
+            Path jarDir,
+            Executor executor,
+            Time timeout,
+            Supplier<ApplicationRunner> applicationRunnerSupplier)
+            throws Exception {
 
         webSubmissionHandlers = new ArrayList<>();
 
-        final JarUploadHandler jarUploadHandler =
+        final Executor jarRunExecutor =
+                new SeparateThreadExecutor(
+                        new ExecutorThreadFactory.Builder()
+                                .setPoolName("flink-jar-runner")
+                                .build());
+
+        jarUploadHandler =
                 new JarUploadHandler(
                         leaderRetriever,
                         timeout,
@@ -84,7 +121,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
                         configuration,
                         executor);
 
-        final JarRunHandler jarRunHandler =
+        jarRunHandler =
                 new JarRunHandler(
                         leaderRetriever,
                         timeout,
@@ -92,8 +129,8 @@ public class WebSubmissionExtension implements WebMonitorExtension {
                         JarRunHeaders.getInstance(),
                         jarDir,
                         configuration,
-                        executor,
-                        () -> new DetachedApplicationRunner(true));
+                        jarRunExecutor,
+                        applicationRunnerSupplier);
 
         final JarDeleteHandler jarDeleteHandler =
                 new JarDeleteHandler(
@@ -112,7 +149,7 @@ public class WebSubmissionExtension implements WebMonitorExtension {
                         JarPlanGetHeaders.getInstance(),
                         jarDir,
                         configuration,
-                        executor);
+                        jarRunExecutor);
 
         final JarPlanHandler postJarPlanHandler =
                 new JarPlanHandler(
@@ -141,4 +178,14 @@ public class WebSubmissionExtension implements WebMonitorExtension {
     public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() {
         return webSubmissionHandlers;
     }
+
+    @VisibleForTesting
+    JarUploadHandler getJarUploadHandler() {
+        return jarUploadHandler;
+    }
+
+    @VisibleForTesting
+    JarRunHandler getJarRunHandler() {
+        return jarRunHandler;
+    }
 }
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
index 6201d04..280387f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
@@ -29,7 +30,8 @@ import java.util.Collections;
 /** Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. */
 abstract class JarMessageParameters extends MessageParameters {
 
-    final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+    @VisibleForTesting
+    public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
 
     final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 4573300..894b882 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.deployment.application.ApplicationRunner;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
@@ -82,7 +83,8 @@ public class JarRunHandler
     }
 
     @Override
-    protected CompletableFuture<JarRunResponseBody> handleRequest(
+    @VisibleForTesting
+    public CompletableFuture<JarRunResponseBody> handleRequest(
             @Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
             @Nonnull final DispatcherGateway gateway)
             throws RestHandlerException {
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 3dfe94f..4ac3e89 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -68,7 +69,8 @@ public class JarUploadHandler
     }
 
     @Override
-    protected CompletableFuture<JarUploadResponseBody> handleRequest(
+    @VisibleForTesting
+    public CompletableFuture<JarUploadResponseBody> handleRequest(
             @Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
             @Nonnull final RestfulGateway gateway)
             throws RestHandlerException {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java
new file mode 100644
index 0000000..7f99d29
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtensionTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.deployment.application.ApplicationRunner;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarIdPathParameter;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for the {@link WebSubmissionExtension}. */
+public class WebSubmissionExtensionTest {
+
+    private static final String JAR_NAME = "output-test-program.jar";
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void applicationsRunInSeparateThreads() throws Exception {
+        final Path tempDir = temporaryFolder.getRoot().toPath();
+
+        final Path uploadDir = Files.createDirectories(tempDir.resolve("uploadDir"));
+        // create a copy because the upload handler moves uploaded jars (because it assumes it to be
+        // a temporary file)
+        final Path jarFile =
+                Files.copy(
+                        Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME),
+                        tempDir.resolve("app.jar"));
+
+        final DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().build();
+
+        final ThreadCapturingApplicationRunner threadCapturingApplicationRunner =
+                new ThreadCapturingApplicationRunner();
+
+        final WebSubmissionExtension webSubmissionExtension =
+                new WebSubmissionExtension(
+                        new Configuration(),
+                        () -> CompletableFuture.completedFuture(dispatcherGateway),
+                        Collections.emptyMap(),
+                        new CompletableFuture<>(),
+                        uploadDir,
+                        Executors.directExecutor(),
+                        Time.of(5, TimeUnit.SECONDS),
+                        () -> threadCapturingApplicationRunner);
+
+        final String jarPath = uploadJar(webSubmissionExtension, jarFile, dispatcherGateway);
+        final String jarId = Paths.get(jarPath).getFileName().toString();
+
+        final JarRunHandler jarRunHandler = webSubmissionExtension.getJarRunHandler();
+
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(JarIdPathParameter.KEY, jarId);
+        final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest =
+                new HandlerRequest(
+                        new JarRunRequestBody(),
+                        new JarRunMessageParameters(),
+                        pathParameters,
+                        Collections.emptyMap());
+
+        // run several applications in sequence, and verify that each thread is unique
+        int numApplications = 20;
+        for (int i = 0; i < numApplications; i++) {
+            jarRunHandler.handleRequest(runRequest, dispatcherGateway).get();
+        }
+        assertEquals(numApplications, threadCapturingApplicationRunner.getThreads().size());
+    }
+
+    private static String uploadJar(
+            WebSubmissionExtension extension, Path jarFile, DispatcherGateway dispatcherGateway)
+            throws Exception {
+        final JarUploadHandler jarUploadHandler = extension.getJarUploadHandler();
+
+        final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest =
+                new HandlerRequest<>(
+                        EmptyRequestBody.getInstance(),
+                        EmptyMessageParameters.getInstance(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        Collections.singletonList(jarFile.toFile()));
+
+        return jarUploadHandler.handleRequest(uploadRequest, dispatcherGateway).get().getFilename();
+    }
+
+    private static class ThreadCapturingApplicationRunner implements ApplicationRunner {
+
+        private final Set<Thread> threads = Collections.newSetFromMap(new IdentityHashMap<>());
+
+        @Override
+        public List<JobID> run(
+                DispatcherGateway dispatcherGateway,
+                PackagedProgram program,
+                Configuration configuration) {
+            threads.add(Thread.currentThread());
+            return Collections.singletonList(new JobID());
+        }
+
+        public Collection<Thread> getThreads() {
+            return threads;
+        }
+    }
+}

[flink] 02/02: Update for 1.12.7

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9664c17813ef03397c7500f371c5bec499244233
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Dec 16 14:19:27 2021 +0100

    Update for 1.12.7
---
 docs/_config.yml                     | 2 +-
 docs/dev/project-configuration.md    | 2 +-
 docs/dev/project-configuration.zh.md | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/_config.yml b/docs/_config.yml
index 9326fed..0d6ceb3 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -27,7 +27,7 @@
 # we change the version for the complete docs when forking of a release branch
 # etc.
 # The full version string as referenced in Maven (e.g. 1.2.1)
-version: "1.12.3"
+version: "1.12.7"
 # For stable releases, leave the bugfix version out (e.g. 1.2). For snapshot
 # release this should be the same as the regular version
 version_title: 1.12
diff --git a/docs/dev/project-configuration.md b/docs/dev/project-configuration.md
index 558f915..36d4610 100644
--- a/docs/dev/project-configuration.md
+++ b/docs/dev/project-configuration.md
@@ -326,7 +326,7 @@ ext {
     flinkVersion = '{{ site.version }}'
     scalaBinaryVersion = '{{ site.scala_version }}'
     slf4jVersion = '1.7.15'
-    log4jVersion = '2.12.1'
+    log4jVersion = '2.16.0'
 }
 
 
diff --git a/docs/dev/project-configuration.zh.md b/docs/dev/project-configuration.zh.md
index 5633c3d..3f48dce 100644
--- a/docs/dev/project-configuration.zh.md
+++ b/docs/dev/project-configuration.zh.md
@@ -326,7 +326,7 @@ ext {
     flinkVersion = '{{ site.version }}'
     scalaBinaryVersion = '{{ site.scala_version }}'
     slf4jVersion = '1.7.15'
-    log4jVersion = '2.12.1'
+    log4jVersion = '2.16.0'
 }