You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:15:00 UTC

[flink] 07/09: [hotfix] Add --host config option to ClusterEntrypoint

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31782136ea2876518519d1fbe8a56dbd07aac328
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 13 20:41:32 2018 +0200

    [hotfix] Add --host config option to ClusterEntrypoint
    
    This is necessary to support the command line syntax used by the multi master
    standalone start-up scripts.
---
 .../flink/runtime/entrypoint/ClusterConfiguration.java      | 13 ++++++++++++-
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java  | 11 ++++++++++-
 .../org/apache/flink/runtime/rest/RestServerEndpoint.java   |  1 +
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
index 7f8b509..d53647d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 /**
  * Configuration class which contains the parsed command line arguments for
  * the {@link ClusterEntrypoint}.
@@ -29,8 +31,12 @@ public class ClusterConfiguration {
 
 	private final int restPort;
 
-	public ClusterConfiguration(String configDir, int restPort) {
+	@Nullable
+	private final String hostname;
+
+	public ClusterConfiguration(String configDir, @Nullable String hostname, int restPort) {
 		this.configDir = Preconditions.checkNotNull(configDir);
+		this.hostname = hostname;
 		this.restPort = restPort;
 	}
 
@@ -38,6 +44,11 @@ public class ClusterConfiguration {
 		return configDir;
 	}
 
+	@Nullable
+	public String getHostname() {
+		return hostname;
+	}
+
 	public int getRestPort() {
 		return restPort;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index a267abb..c32be81 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -702,7 +702,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			restPort = -1;
 		}
 
-		return new ClusterConfiguration(configDir, restPort);
+		final String hostKey = "host";
+		final String hostname = parameterTool.get(hostKey);
+
+		return new ClusterConfiguration(configDir, hostname, restPort);
 	}
 
 	protected static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
@@ -714,6 +717,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setInteger(RestOptions.PORT, restPort);
 		}
 
+		final String hostname = clusterConfiguration.getHostname();
+
+		if (hostname != null) {
+			configuration.setString(JobManagerOptions.ADDRESS, hostname);
+		}
+
 		return configuration;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 01d1043..3e85271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -180,6 +180,7 @@ public abstract class RestServerEndpoint implements AutoCloseableAsync {
 				.channel(NioServerSocketChannel.class)
 				.childHandler(initializer);
 
+			log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
 			final ChannelFuture channel;
 			if (restBindAddress == null) {
 				channel = bootstrap.bind(restBindPort);