You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:10 UTC
[flink] 07/09: [hotfix] Add --host and --executionMode config
option to ClusterEntrypoint
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9c4c138a2db7ab84733482d9ba9508b30056843d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 20:41:32 2018 +0200
[hotfix] Add --host and --executionMode config option to ClusterEntrypoint
This is necessary to support the command line syntax used by the multi master
standalone start-up scripts.
---
.../StandaloneJobClusterConfiguration.java | 5 +++--
...ndaloneJobClusterConfigurationParserFactory.java | 3 +++
.../flink/runtime/entrypoint/ClusterEntrypoint.java | 6 ++++++
.../entrypoint/EntrypointClusterConfiguration.java | 12 +++++++++++-
...EntrypointClusterConfigurationParserFactory.java | 6 ++++++
.../entrypoint/parser/CommandLineOptions.java | 21 +++++++++++++++++++++
.../flink/runtime/rest/RestServerEndpoint.java | 1 +
...ypointClusterConfigurationParserFactoryTest.java | 2 +-
8 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 326e924..d8ad5ab 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Properties;
@@ -36,8 +37,8 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura
@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;
- public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
- super(configDir, dynamicProperties, args, restPort);
+ public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
+ super(configDir, dynamicProperties, args, hostname, restPort);
this.jobClassName = jobClassName;
this.savepointRestoreSettings = savepointRestoreSettings;
}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index 3c65ba8..17217ef 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -32,6 +32,7 @@ import java.util.Properties;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
/**
@@ -67,6 +68,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes
final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortString);
+ final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
@@ -74,6 +76,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes
configDir,
dynamicProperties,
commandLine.getArgs(),
+ hostname,
restPort,
jobClassName,
savepointRestoreSettings);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ddd3751c..0fd4389 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -708,6 +708,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
configuration.setInteger(RestOptions.PORT, restPort);
}
+ final String hostname = entrypointClusterConfiguration.getHostname();
+
+ if (hostname != null) {
+ configuration.setString(JobManagerOptions.ADDRESS, hostname);
+ }
+
return configuration;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
index 75cad7a..3472f35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.entrypoint;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Properties;
@@ -27,14 +28,23 @@ import java.util.Properties;
*/
public class EntrypointClusterConfiguration extends ClusterConfiguration {
+ @Nullable
+ private final String hostname;
+
private final int restPort;
- public EntrypointClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort) {
+ public EntrypointClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort) {
super(configDir, dynamicProperties, args);
+ this.hostname = hostname;
this.restPort = restPort;
}
public int getRestPort() {
return restPort;
}
+
+ @Nullable
+ public String getHostname() {
+ return hostname;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
index 7dfb784..52f59ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
@@ -29,6 +29,8 @@ import java.util.Properties;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.EXECUTION_MODE_OPTION;
+import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
/**
@@ -42,6 +44,8 @@ public class EntrypointClusterConfigurationParserFactory implements ParserResult
options.addOption(CONFIG_DIR_OPTION);
options.addOption(REST_PORT_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);
+ options.addOption(HOST_OPTION);
+ options.addOption(EXECUTION_MODE_OPTION);
return options;
}
@@ -52,11 +56,13 @@ public class EntrypointClusterConfigurationParserFactory implements ParserResult
final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final String restPortStr = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortStr);
+ final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());
return new EntrypointClusterConfiguration(
configDir,
dynamicProperties,
commandLine.getArgs(),
+ hostname,
restPort);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
index 23c9da2..443014b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
@@ -48,5 +48,26 @@ public class CommandLineOptions {
.desc("use value for given property")
.build();
+ public static final Option HOST_OPTION = Option.builder("h")
+ .longOpt("host")
+ .required(false)
+ .hasArg(true)
+ .argName("hostname")
+ .desc("Hostname for the RPC service.")
+ .build();
+
+ /**
+ * @deprecated exists only for compatibility with legacy mode. Remove once legacy mode
+ * and execution mode option has been removed.
+ */
+ @Deprecated
+ public static final Option EXECUTION_MODE_OPTION = Option.builder("x")
+ .longOpt("executionMode")
+ .required(false)
+ .hasArg(true)
+ .argName("execution mode")
+ .desc("Deprecated option")
+ .build();
+
private CommandLineOptions() {}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 28af072..38da82c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -178,6 +178,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
+ log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
final ChannelFuture channel;
if (restBindAddress == null) {
channel = bootstrap.bind(restBindPort);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
index da63b7f..906e9d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
@@ -46,7 +46,7 @@ public class EntrypointClusterConfigurationParserFactoryTest extends TestLogger
final String value = "value";
final String arg1 = "arg1";
final String arg2 = "arg2";
- final String[] args = {"--configDir", configDir, "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
+ final String[] args = {"--configDir", configDir, "--executionMode", "cluster", "--host", "localhost", "-r", String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
final EntrypointClusterConfiguration clusterConfiguration = commandLineParser.parse(args);