You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/29 13:00:58 UTC
[flink] branch release-1.15 updated: [FLINK-28733][scripts] jobmanager.sh supports dynamic parameters
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new dbfdebaa1c0 [FLINK-28733][scripts] jobmanager.sh supports dynamic parameters
dbfdebaa1c0 is described below
commit dbfdebaa1c0eac5346871e023a02662b8301c295
Author: pvary <pe...@gmail.com>
AuthorDate: Thu Sep 29 15:00:50 2022 +0200
[FLINK-28733][scripts] jobmanager.sh supports dynamic parameters
---
flink-dist/src/main/flink-bin/bin/jobmanager.sh | 22 ++-
.../flink/tests/util/flink/FlinkDistribution.java | 36 ++++-
.../apache/flink/dist/DynamicParameterITCase.java | 175 +++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 ++++
.../runtime/entrypoint/ClusterEntrypoint.java | 2 +-
5 files changed, 249 insertions(+), 14 deletions(-)
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 35fbe2c37ff..76b4423af7a 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -18,11 +18,23 @@
################################################################################
# Start/stop a Flink JobManager.
-USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
+USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port] [args])|stop|stop-all"
STARTSTOP=$1
-HOST=$2 # optional when starting multiple instances
-WEBUIPORT=$3 # optional when starting multiple instances
+
+if [ -z $2 ] || [[ $2 == "-D" ]]; then
+ # start [-D ...]
+ args=("${@:2}")
+elif [ -z $3 ] || [[ $3 == "-D" ]]; then
+ # start <host> [-D ...]
+ HOST=$2
+ args=("${@:3}")
+else
+ # start <host> <port> [-D ...]
+ HOST=$2
+ WEBUIPORT=$3
+ args=("${@:4}")
+fi
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
echo $USAGE
@@ -41,7 +53,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
parseJmArgsAndExportLogs "${ARGS[@]}"
- args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
+ args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "${args[@]}")
if [ ! -z $HOST ]; then
args+=("--host")
args+=("${HOST}")
@@ -53,7 +65,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
fi
if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
- args+=(${DYNAMIC_PARAMETERS[@]})
+ args=(${DYNAMIC_PARAMETERS[@]} "${args[@]}")
fi
fi
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 0988e437c0a..f8b6b94e899 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -48,6 +48,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -62,7 +63,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/** A wrapper around a Flink distribution. */
-final class FlinkDistribution {
+public final class FlinkDistribution {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class);
@@ -79,7 +80,7 @@ final class FlinkDistribution {
private final Configuration defaultConfig;
- FlinkDistribution(Path distributionDir) {
+ public FlinkDistribution(Path distributionDir) {
bin = distributionDir.resolve("bin");
opt = distributionDir.resolve("opt");
lib = distributionDir.resolve("lib");
@@ -94,14 +95,33 @@ final class FlinkDistribution {
public void startJobManager() throws IOException {
LOG.info("Starting Flink JobManager.");
- AutoClosableProcess.runBlocking(
- bin.resolve("jobmanager.sh").toAbsolutePath().toString(), "start");
+ internalCallJobManagerScript("start");
+ }
+
+ public void callJobManagerScript(String... args) throws IOException {
+ LOG.info("Calling Flink JobManager script with {}.", Arrays.toString(args));
+ internalCallJobManagerScript(args);
+ }
+
+ private void internalCallJobManagerScript(String... args) throws IOException {
+ List<String> arguments = new ArrayList<>();
+ arguments.add(bin.resolve("jobmanager.sh").toAbsolutePath().toString());
+ arguments.addAll(Arrays.asList(args));
+ AutoClosableProcess.create(arguments.toArray(new String[0]))
+ // ignore the variable, we assume we log to the distribution directory
+ // and we copy the logs over in case of failure
+ .setEnv(env -> env.remove("FLINK_LOG_DIR"))
+ .runBlocking();
}
public void startTaskManager() throws IOException {
LOG.info("Starting Flink TaskManager.");
- AutoClosableProcess.runBlocking(
- bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
+ AutoClosableProcess.create(
+ bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start")
+ // ignore the variable, we assume we log to the distribution directory
+ // and we copy the logs over in case of failure
+ .setEnv(env -> env.remove("FLINK_LOG_DIR"))
+ .runBlocking();
}
public void setRootLogLevel(Level logLevel) throws IOException {
@@ -309,9 +329,9 @@ final class FlinkDistribution {
Files.write(conf.resolve("workers"), taskExecutorHosts);
}
- public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> matchProcessor)
+ public <T> Stream<T> searchAllLogs(Pattern pattern, Function<Matcher, T> matchProcessor)
throws IOException {
- final List<String> matches = new ArrayList<>(2);
+ final List<T> matches = new ArrayList<>(2);
try (Stream<Path> logFilesStream = Files.list(log)) {
final Iterator<Path> logFiles = logFilesStream.iterator();
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/dist/DynamicParameterITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/dist/DynamicParameterITCase.java
new file mode 100644
index 00000000000..2dcd7b5d5f4
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/dist/DynamicParameterITCase.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkDistribution;
+import org.apache.flink.tests.util.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class DynamicParameterITCase {
+
+ private static final Pattern ENTRYPOINT_LOG_PATTERN =
+ Pattern.compile(".*ClusterEntrypoint +\\[] - +(.*)");
+ private static final Pattern ENTRYPOINT_CLASSPATH_LOG_PATTERN =
+ Pattern.compile(".*ClusterEntrypoint +\\[] - +Classpath:.*");
+
+ private static final String HOST = "localhost";
+ private static final int PORT = 8081;
+
+ private static final String DYNAMIC_KEY = "hello";
+ private static final String DYNAMIC_VALUE = "world";
+ private static final String DYNAMIC_PROPERTY = DYNAMIC_KEY + "=" + DYNAMIC_VALUE;
+
+ private static final Path originalDist = FileUtils.findFlinkDist();
+
+ private FlinkDistribution dist;
+
+ @BeforeEach
+ private void setup(@TempDir Path tmp) throws IOException {
+ TestUtils.copyDirectory(originalDist, tmp);
+ dist = new FlinkDistribution(tmp);
+ }
+
+ @AfterEach
+ private void cleanup() throws IOException {
+ if (dist != null) {
+ dist.stopFlinkCluster();
+ }
+ }
+
+ @Test
+ void testWithoutAnyParameter() throws Exception {
+ assertParameterPassing(dist, false, false, false);
+ }
+
+ @Test
+ void testWithHost() throws Exception {
+ assertParameterPassing(dist, true, false, false);
+ }
+
+ @Test
+ void testWithHostAndPort() throws Exception {
+ assertParameterPassing(dist, true, true, false);
+ }
+
+ @Test
+ void testWithDynamicParameter() throws Exception {
+ assertParameterPassing(dist, false, false, true);
+ }
+
+ @Test
+ void testWithDynamicParameterAndHost() throws Exception {
+ assertParameterPassing(dist, true, false, true);
+ }
+
+ @Test
+ void testWithDynamicParameterAndHostAndPort() throws Exception {
+ assertParameterPassing(dist, true, true, true);
+ }
+
+ private static void assertParameterPassing(
+ FlinkDistribution dist, boolean withHost, boolean withPort, boolean withDynamicProperty)
+ throws Exception {
+
+ final List<String> args = new ArrayList<>();
+ args.add("start");
+
+ if (withHost) {
+ args.add(HOST);
+ }
+ if (withPort) {
+ Preconditions.checkState(withHost, "port may only be supplied with a host");
+ args.add(String.valueOf(PORT));
+ }
+ if (withDynamicProperty) {
+ args.add("-D");
+ args.add(DYNAMIC_PROPERTY);
+ }
+
+ dist.callJobManagerScript(args.toArray(new String[0]));
+
+ while (!allProgramArgumentsLogged(dist)) {
+ Thread.sleep(500);
+ }
+
+ try (Stream<String> lines =
+ dist.searchAllLogs(ENTRYPOINT_LOG_PATTERN, matcher -> matcher.group(1))) {
+
+ final EntrypointClusterConfiguration entrypointConfig =
+ ClusterEntrypoint.parseArguments(
+ lines.filter(new ProgramArgumentsFilter()).toArray(String[]::new));
+
+ assertThat(entrypointConfig.getHostname()).isEqualTo(withHost ? HOST : null);
+ assertThat(entrypointConfig.getRestPort()).isEqualTo(withPort ? PORT : -1);
+
+ if (withDynamicProperty) {
+ assertThat(entrypointConfig.getDynamicProperties())
+ .containsEntry(DYNAMIC_KEY, DYNAMIC_VALUE);
+ } else {
+ assertThat(entrypointConfig.getDynamicProperties())
+ .doesNotContainEntry(DYNAMIC_KEY, DYNAMIC_VALUE);
+ }
+ } catch (FlinkParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static boolean allProgramArgumentsLogged(FlinkDistribution dist) throws IOException {
+ // the classpath is logged after the program arguments
+ try (Stream<String> lines =
+ dist.searchAllLogs(ENTRYPOINT_CLASSPATH_LOG_PATTERN, matcher -> matcher.group(0))) {
+ return lines.iterator().hasNext();
+ }
+ }
+
+ private static class ProgramArgumentsFilter implements Predicate<String> {
+
+ private boolean inProgramArguments = false;
+
+ @Override
+ public boolean test(String s) {
+ if (s.contains("Program Arguments:")) {
+ inProgramArguments = true;
+ return false;
+ }
+ if (s.contains("Classpath:")) {
+ inProgramArguments = false;
+ }
+ return inProgramArguments;
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..835c2ec9a3d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index bae7591f044..c7540c90841 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -668,7 +668,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;
- protected static EntrypointClusterConfiguration parseArguments(String[] args)
+ public static EntrypointClusterConfiguration parseArguments(String[] args)
throws FlinkParseException {
final CommandLineParser<EntrypointClusterConfiguration> clusterConfigurationParser =
new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());