You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:10 UTC

[flink] 07/09: [hotfix] Add --host and --executionMode config option to ClusterEntrypoint

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c4c138a2db7ab84733482d9ba9508b30056843d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 20:41:32 2018 +0200

    [hotfix] Add --host and --executionMode config option to ClusterEntrypoint
    
    This is necessary to support the command line syntax used by the multi master
    standalone start-up scripts.
---
 .../StandaloneJobClusterConfiguration.java          |  5 +++--
 ...ndaloneJobClusterConfigurationParserFactory.java |  3 +++
 .../flink/runtime/entrypoint/ClusterEntrypoint.java |  6 ++++++
 .../entrypoint/EntrypointClusterConfiguration.java  | 12 +++++++++++-
 ...EntrypointClusterConfigurationParserFactory.java |  6 ++++++
 .../entrypoint/parser/CommandLineOptions.java       | 21 +++++++++++++++++++++
 .../flink/runtime/rest/RestServerEndpoint.java      |  1 +
 ...ypointClusterConfigurationParserFactoryTest.java |  2 +-
 8 files changed, 52 insertions(+), 4 deletions(-)

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 326e924..d8ad5ab 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -36,8 +37,8 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura
 	@Nonnull
 	private final SavepointRestoreSettings savepointRestoreSettings;
 
-	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
-		super(configDir, dynamicProperties, args, restPort);
+	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
+		super(configDir, dynamicProperties, args, hostname, restPort);
 		this.jobClassName = jobClassName;
 		this.savepointRestoreSettings = savepointRestoreSettings;
 	}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index 3c65ba8..17217ef 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -32,6 +32,7 @@ import java.util.Properties;
 
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
 
 /**
@@ -67,6 +68,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes
 		final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
 		final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
 		final int restPort = Integer.parseInt(restPortString);
+		final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
 		final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
 		final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
 
@@ -74,6 +76,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes
 			configDir,
 			dynamicProperties,
 			commandLine.getArgs(),
+			hostname,
 			restPort,
 			jobClassName,
 			savepointRestoreSettings);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ddd3751c..0fd4389 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -708,6 +708,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setInteger(RestOptions.PORT, restPort);
 		}
 
+		final String hostname = entrypointClusterConfiguration.getHostname();
+
+		if (hostname != null) {
+			configuration.setString(JobManagerOptions.ADDRESS, hostname);
+		}
+
 		return configuration;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
index 75cad7a..3472f35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.entrypoint;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -27,14 +28,23 @@ import java.util.Properties;
  */
 public class EntrypointClusterConfiguration extends ClusterConfiguration {
 
+	@Nullable
+	private final String hostname;
+
 	private final int restPort;
 
-	public EntrypointClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort) {
+	public EntrypointClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort) {
 		super(configDir, dynamicProperties, args);
+		this.hostname = hostname;
 		this.restPort = restPort;
 	}
 
 	public int getRestPort() {
 		return restPort;
 	}
+
+	@Nullable
+	public String getHostname() {
+		return hostname;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
index 7dfb784..52f59ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
@@ -29,6 +29,8 @@ import java.util.Properties;
 
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.EXECUTION_MODE_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
 import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
 
 /**
@@ -42,6 +44,8 @@ public class EntrypointClusterConfigurationParserFactory implements ParserResult
 		options.addOption(CONFIG_DIR_OPTION);
 		options.addOption(REST_PORT_OPTION);
 		options.addOption(DYNAMIC_PROPERTY_OPTION);
+		options.addOption(HOST_OPTION);
+		options.addOption(EXECUTION_MODE_OPTION);
 
 		return options;
 	}
@@ -52,11 +56,13 @@ public class EntrypointClusterConfigurationParserFactory implements ParserResult
 		final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
 		final String restPortStr = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
 		final int restPort = Integer.parseInt(restPortStr);
+		final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
 
 		return new EntrypointClusterConfiguration(
 			configDir,
 			dynamicProperties,
 			commandLine.getArgs(),
+			hostname,
 			restPort);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
index 23c9da2..443014b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
@@ -48,5 +48,26 @@ public class CommandLineOptions {
 		.desc("use value for given property")
 		.build();
 
+	public static final Option HOST_OPTION = Option.builder("h")
+		.longOpt("host")
+		.required(false)
+		.hasArg(true)
+		.argName("hostname")
+		.desc("Hostname for the RPC service.")
+		.build();
+
+	/**
+	 * @deprecated exists only for compatibility with legacy mode. Remove once legacy mode
+	 * and execution mode option has been removed.
+	 */
+	@Deprecated
+	public static final Option EXECUTION_MODE_OPTION = Option.builder("x")
+		.longOpt("executionMode")
+		.required(false)
+		.hasArg(true)
+		.argName("execution mode")
+		.desc("Deprecated option")
+		.build();
+
 	private CommandLineOptions() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 28af072..38da82c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -178,6 +178,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 				.channel(NioServerSocketChannel.class)
 				.childHandler(initializer);
 
+			log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
 			final ChannelFuture channel;
 			if (restBindAddress == null) {
 				channel = bootstrap.bind(restBindPort);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
index da63b7f..906e9d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
@@ -46,7 +46,7 @@ public class EntrypointClusterConfigurationParserFactoryTest extends TestLogger
 		final String value = "value";
 		final String arg1 = "arg1";
 		final String arg2 = "arg2";
-		final String[] args = {"--configDir", configDir, "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
+		final String[] args = {"--configDir", configDir, "--executionMode", "cluster", "--host", "localhost",  "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
 
 		final EntrypointClusterConfiguration clusterConfiguration = commandLineParser.parse(args);