You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:15:00 UTC
[flink] 07/09: [hotfix] Add --host config option to
ClusterEntrypoint
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 31782136ea2876518519d1fbe8a56dbd07aac328
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 20:41:32 2018 +0200
[hotfix] Add --host config option to ClusterEntrypoint
This is necessary to support the command line syntax used by the multi master
standalone start-up scripts.
---
.../flink/runtime/entrypoint/ClusterConfiguration.java | 13 ++++++++++++-
.../apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 11 ++++++++++-
.../org/apache/flink/runtime/rest/RestServerEndpoint.java | 1 +
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
index 7f8b509..d53647d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.entrypoint;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
/**
* Configuration class which contains the parsed command line arguments for
* the {@link ClusterEntrypoint}.
@@ -29,8 +31,12 @@ public class ClusterConfiguration {
private final int restPort;
- public ClusterConfiguration(String configDir, int restPort) {
+ @Nullable
+ private final String hostname;
+
+ public ClusterConfiguration(String configDir, @Nullable String hostname, int restPort) {
this.configDir = Preconditions.checkNotNull(configDir);
+ this.hostname = hostname;
this.restPort = restPort;
}
@@ -38,6 +44,11 @@ public class ClusterConfiguration {
return configDir;
}
+ @Nullable
+ public String getHostname() {
+ return hostname;
+ }
+
public int getRestPort() {
return restPort;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index a267abb..c32be81 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -702,7 +702,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
restPort = -1;
}
- return new ClusterConfiguration(configDir, restPort);
+ final String hostKey = "host";
+ final String hostname = parameterTool.get(hostKey);
+
+ return new ClusterConfiguration(configDir, hostname, restPort);
}
protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
@@ -714,6 +717,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
configuration.setInteger(RestOptions.PORT, restPort);
}
+ final String hostname = clusterConfiguration.getHostname();
+
+ if (hostname != null) {
+ configuration.setString(JobManagerOptions.ADDRESS, hostname);
+ }
+
return configuration;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 01d1043..3e85271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -180,6 +180,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
+ log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
final ChannelFuture channel;
if (restBindAddress == null) {
channel = bootstrap.bind(restBindPort);