You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/29 13:00:58 UTC

[flink] branch release-1.15 updated: [FLINK-28733][scripts] jobmanager.sh supports dynamic parameters

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new dbfdebaa1c0 [FLINK-28733][scripts] jobmanager.sh supports dynamic parameters
dbfdebaa1c0 is described below

commit dbfdebaa1c0eac5346871e023a02662b8301c295
Author: pvary <pe...@gmail.com>
AuthorDate: Thu Sep 29 15:00:50 2022 +0200

    [FLINK-28733][scripts] jobmanager.sh supports dynamic parameters
---
 flink-dist/src/main/flink-bin/bin/jobmanager.sh    |  22 ++-
 .../flink/tests/util/flink/FlinkDistribution.java  |  36 ++++-
 .../apache/flink/dist/DynamicParameterITCase.java  | 175 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 ++++
 .../runtime/entrypoint/ClusterEntrypoint.java      |   2 +-
 5 files changed, 249 insertions(+), 14 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 35fbe2c37ff..76b4423af7a 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -18,11 +18,23 @@
 ################################################################################
 
 # Start/stop a Flink JobManager.
-USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
+USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port] [args])|stop|stop-all"
 
 STARTSTOP=$1
-HOST=$2 # optional when starting multiple instances
-WEBUIPORT=$3 # optional when starting multiple instances
+
+if [ -z $2 ] || [[ $2 == "-D" ]]; then
+    # start [-D ...]
+    args=("${@:2}")
+elif [ -z $3 ] || [[ $3 == "-D" ]]; then
+    # start <host> [-D ...]
+    HOST=$2
+    args=("${@:3}")
+else
+    # start <host> <port> [-D ...]
+    HOST=$2
+    WEBUIPORT=$3
+    args=("${@:4}")
+fi
 
 if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
   echo $USAGE
@@ -41,7 +53,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
     export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
     parseJmArgsAndExportLogs "${ARGS[@]}"
 
-    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
+    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "${args[@]}")
     if [ ! -z $HOST ]; then
         args+=("--host")
         args+=("${HOST}")
@@ -53,7 +65,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
     fi
 
     if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
-        args+=(${DYNAMIC_PARAMETERS[@]})
+        args=(${DYNAMIC_PARAMETERS[@]} "${args[@]}")
     fi
 fi
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 0988e437c0a..f8b6b94e899 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -48,6 +48,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -62,7 +63,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** A wrapper around a Flink distribution. */
-final class FlinkDistribution {
+public final class FlinkDistribution {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class);
 
@@ -79,7 +80,7 @@ final class FlinkDistribution {
 
     private final Configuration defaultConfig;
 
-    FlinkDistribution(Path distributionDir) {
+    public FlinkDistribution(Path distributionDir) {
         bin = distributionDir.resolve("bin");
         opt = distributionDir.resolve("opt");
         lib = distributionDir.resolve("lib");
@@ -94,14 +95,33 @@ final class FlinkDistribution {
 
     public void startJobManager() throws IOException {
         LOG.info("Starting Flink JobManager.");
-        AutoClosableProcess.runBlocking(
-                bin.resolve("jobmanager.sh").toAbsolutePath().toString(), "start");
+        internalCallJobManagerScript("start");
+    }
+
+    public void callJobManagerScript(String... args) throws IOException {
+        LOG.info("Calling Flink JobManager script with {}.", Arrays.toString(args));
+        internalCallJobManagerScript(args);
+    }
+
+    private void internalCallJobManagerScript(String... args) throws IOException {
+        List<String> arguments = new ArrayList<>();
+        arguments.add(bin.resolve("jobmanager.sh").toAbsolutePath().toString());
+        arguments.addAll(Arrays.asList(args));
+        AutoClosableProcess.create(arguments.toArray(new String[0]))
+                // ignore the variable, we assume we log to the distribution directory
+                // and we copy the logs over in case of failure
+                .setEnv(env -> env.remove("FLINK_LOG_DIR"))
+                .runBlocking();
     }
 
     public void startTaskManager() throws IOException {
         LOG.info("Starting Flink TaskManager.");
-        AutoClosableProcess.runBlocking(
-                bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
+        AutoClosableProcess.create(
+                        bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start")
+                // ignore the variable, we assume we log to the distribution directory
+                // and we copy the logs over in case of failure
+                .setEnv(env -> env.remove("FLINK_LOG_DIR"))
+                .runBlocking();
     }
 
     public void setRootLogLevel(Level logLevel) throws IOException {
@@ -309,9 +329,9 @@ final class FlinkDistribution {
         Files.write(conf.resolve("workers"), taskExecutorHosts);
     }
 
-    public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor)
+    public <T> Stream<T> searchAllLogs(Pattern pattern, Function<Matcher, T> matchProcessor)
             throws IOException {
-        final List<String> matches = new ArrayList<>(2);
+        final List<T> matches = new ArrayList<>(2);
 
         try (Stream<Path> logFilesStream = Files.list(log)) {
             final Iterator<Path> logFiles = logFilesStream.iterator();
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/dist/DynamicParameterITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/dist/DynamicParameterITCase.java
new file mode 100644
index 00000000000..2dcd7b5d5f4
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/dist/DynamicParameterITCase.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkDistribution;
+import org.apache.flink.tests.util.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class DynamicParameterITCase {
+
+    private static final Pattern ENTRYPOINT_LOG_PATTERN =
+            Pattern.compile(".*ClusterEntrypoint +\\[] - +(.*)");
+    private static final Pattern ENTRYPOINT_CLASSPATH_LOG_PATTERN =
+            Pattern.compile(".*ClusterEntrypoint +\\[] - +Classpath:.*");
+
+    private static final String HOST = "localhost";
+    private static final int PORT = 8081;
+
+    private static final String DYNAMIC_KEY = "hello";
+    private static final String DYNAMIC_VALUE = "world";
+    private static final String DYNAMIC_PROPERTY = DYNAMIC_KEY + "=" + DYNAMIC_VALUE;
+
+    private static final Path originalDist = FileUtils.findFlinkDist();
+
+    private FlinkDistribution dist;
+
+    @BeforeEach
+    private void setup(@TempDir Path tmp) throws IOException {
+        TestUtils.copyDirectory(originalDist, tmp);
+        dist = new FlinkDistribution(tmp);
+    }
+
+    @AfterEach
+    private void cleanup() throws IOException {
+        if (dist != null) {
+            dist.stopFlinkCluster();
+        }
+    }
+
+    @Test
+    void testWithoutAnyParameter() throws Exception {
+        assertParameterPassing(dist, false, false, false);
+    }
+
+    @Test
+    void testWithHost() throws Exception {
+        assertParameterPassing(dist, true, false, false);
+    }
+
+    @Test
+    void testWithHostAndPort() throws Exception {
+        assertParameterPassing(dist, true, true, false);
+    }
+
+    @Test
+    void testWithDynamicParameter() throws Exception {
+        assertParameterPassing(dist, false, false, true);
+    }
+
+    @Test
+    void testWithDynamicParameterAndHost() throws Exception {
+        assertParameterPassing(dist, true, false, true);
+    }
+
+    @Test
+    void testWithDynamicParameterAndHostAndPort() throws Exception {
+        assertParameterPassing(dist, true, true, true);
+    }
+
+    private static void assertParameterPassing(
+            FlinkDistribution dist, boolean withHost, boolean withPort, boolean withDynamicProperty)
+            throws Exception {
+
+        final List<String> args = new ArrayList<>();
+        args.add("start");
+
+        if (withHost) {
+            args.add(HOST);
+        }
+        if (withPort) {
+            Preconditions.checkState(withHost, "port may only be supplied with a host");
+            args.add(String.valueOf(PORT));
+        }
+        if (withDynamicProperty) {
+            args.add("-D");
+            args.add(DYNAMIC_PROPERTY);
+        }
+
+        dist.callJobManagerScript(args.toArray(new String[0]));
+
+        while (!allProgramArgumentsLogged(dist)) {
+            Thread.sleep(500);
+        }
+
+        try (Stream<String> lines =
+                dist.searchAllLogs(ENTRYPOINT_LOG_PATTERN, matcher -> matcher.group(1))) {
+
+            final EntrypointClusterConfiguration entrypointConfig =
+                    ClusterEntrypoint.parseArguments(
+                            lines.filter(new ProgramArgumentsFilter()).toArray(String[]::new));
+
+            assertThat(entrypointConfig.getHostname()).isEqualTo(withHost ? HOST : null);
+            assertThat(entrypointConfig.getRestPort()).isEqualTo(withPort ? PORT : -1);
+
+            if (withDynamicProperty) {
+                assertThat(entrypointConfig.getDynamicProperties())
+                        .containsEntry(DYNAMIC_KEY, DYNAMIC_VALUE);
+            } else {
+                assertThat(entrypointConfig.getDynamicProperties())
+                        .doesNotContainEntry(DYNAMIC_KEY, DYNAMIC_VALUE);
+            }
+        } catch (FlinkParseException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static boolean allProgramArgumentsLogged(FlinkDistribution dist) throws IOException {
+        // the classpath is logged after the program arguments
+        try (Stream<String> lines =
+                dist.searchAllLogs(ENTRYPOINT_CLASSPATH_LOG_PATTERN, matcher -> matcher.group(0))) {
+            return lines.iterator().hasNext();
+        }
+    }
+
+    private static class ProgramArgumentsFilter implements Predicate<String> {
+
+        private boolean inProgramArguments = false;
+
+        @Override
+        public boolean test(String s) {
+            if (s.contains("Program Arguments:")) {
+                inProgramArguments = true;
+                return false;
+            }
+            if (s.contains("Classpath:")) {
+                inProgramArguments = false;
+            }
+            return inProgramArguments;
+        }
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..835c2ec9a3d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index bae7591f044..c7540c90841 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -668,7 +668,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
     protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore(
             Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;
 
-    protected static EntrypointClusterConfiguration parseArguments(String[] args)
+    public static EntrypointClusterConfiguration parseArguments(String[] args)
             throws FlinkParseException {
         final CommandLineParser<EntrypointClusterConfiguration> clusterConfigurationParser =
                 new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());