You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/03/18 17:54:38 UTC

[5/7] flink git commit: [FLINK-8843][REST] Decouple bind REST address from advertised address

[FLINK-8843][REST] Decouple bind REST address from advertised address

By default bind REST server on wildcard address.
Rename RestServerEndpoint#getRestAddress to getRestBaseUrl.

This closes #5707.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efd7336f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efd7336f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efd7336f

Branch: refs/heads/master
Commit: efd7336fa693a9f82b9ecfb5d81c0ef747ab7801
Parents: 0caff35
Author: gyao <ga...@data-artisans.com>
Authored: Thu Mar 15 22:04:58 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Mar 18 15:58:14 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/RestOptions.java | 15 +++++--
 .../runtime/entrypoint/ClusterEntrypoint.java   |  4 +-
 .../HighAvailabilityServicesUtils.java          |  6 ++-
 .../flink/runtime/minicluster/MiniCluster.java  |  4 +-
 .../minicluster/MiniClusterConfiguration.java   |  4 ++
 .../flink/runtime/rest/RestServerEndpoint.java  | 46 +++++++++++---------
 .../rest/RestServerEndpointConfiguration.java   | 32 ++++++++++----
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  6 +--
 .../runtime/rest/RestServerEndpointITCase.java  |  3 +-
 9 files changed, 80 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 94d7977..e7421c4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -29,12 +29,21 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 public class RestOptions {
 
 	/**
-	 * The address that the server binds itself to / the client connects to.
+	 * The address that the server binds itself to.
+	 */
+	public static final ConfigOption<String> REST_BIND_ADDRESS =
+		key("rest.bind-address")
+			.noDefaultValue()
+			.withDescription("The address that the server binds itself.");
+
+	/**
+	 * The address that should be used by clients to connect to the server.
 	 */
 	public static final ConfigOption<String> REST_ADDRESS =
 		key("rest.address")
-			.defaultValue("localhost")
-			.withDescription("The address that the server binds itself to / the client connects to.");
+			.noDefaultValue()
+			.withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
+			.withDescription("The address that should be used by clients to connect to the server.");
 
 	/**
 	 * The port that the server listens on / the client connects to.

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 676415b..63c8072 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -330,7 +330,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				metricRegistry,
 				this,
 				clusterInformation,
-				webMonitorEndpoint.getRestAddress());
+				webMonitorEndpoint.getRestBaseUrl());
 
 			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
 
@@ -345,7 +345,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				metricRegistry.getMetricQueryServicePath(),
 				archivedExecutionGraphStore,
 				this,
-				webMonitorEndpoint.getRestAddress());
+				webMonitorEndpoint.getRestBaseUrl());
 
 			LOG.debug("Starting ResourceManager.");
 			resourceManager.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 4f12f2b..f19a421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -39,6 +39,8 @@ import org.apache.flink.util.ConfigurationException;
 
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utils class to instantiate {@link HighAvailabilityServices} implementations.
  */
@@ -97,7 +99,9 @@ public class HighAvailabilityServicesUtils {
 					addressResolution,
 					configuration);
 
-				final String address = configuration.getString(RestOptions.REST_ADDRESS);
+				final String address = checkNotNull(configuration.getString(RestOptions.REST_ADDRESS),
+					"%s must be set",
+					RestOptions.REST_ADDRESS.key());
 				final int port = configuration.getInteger(RestOptions.REST_PORT);
 				final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED);
 				final String protocol = enableSSL ? "https://" : "http://";

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 74aa388..dfe30af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -341,7 +341,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 
 				dispatcherRestEndpoint.start();
 
-				restAddressURI = new URI(dispatcherRestEndpoint.getRestAddress());
+				restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
 				LOG.info("Starting job dispatcher(s) for JobManger");
@@ -361,7 +361,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					new MemoryArchivedExecutionGraphStore(),
 					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
-					dispatcherRestEndpoint.getRestAddress());
+					dispatcherRestEndpoint.getRestBaseUrl());
 
 				dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 08af0c4..fe76694 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
@@ -167,6 +168,9 @@ public class MiniClusterConfiguration {
 		public MiniClusterConfiguration build() {
 			final Configuration modifiedConfiguration = new Configuration(configuration);
 			modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+			modifiedConfiguration.setString(
+				RestOptions.REST_ADDRESS,
+				modifiedConfiguration.getString(RestOptions.REST_ADDRESS, "localhost"));
 
 			return new MiniClusterConfiguration(
 				modifiedConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index a3d4843..dfb01ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -73,8 +73,9 @@ public abstract class RestServerEndpoint {
 
 	private final Object lock = new Object();
 
-	private final String configuredAddress;
-	private final int configuredPort;
+	private final String restAddress;
+	private final String restBindAddress;
+	private final int restBindPort;
 	private final SSLEngine sslEngine;
 	private final Path uploadDir;
 	private final int maxContentLength;
@@ -84,14 +85,16 @@ public abstract class RestServerEndpoint {
 
 	private ServerBootstrap bootstrap;
 	private Channel serverChannel;
-	private String restAddress;
+	private String restBaseUrl;
 
 	private State state = State.CREATED;
 
 	public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException {
 		Preconditions.checkNotNull(configuration);
-		this.configuredAddress = configuration.getEndpointBindAddress();
-		this.configuredPort = configuration.getEndpointBindPort();
+
+		this.restAddress = configuration.getRestAddress();
+		this.restBindAddress = configuration.getRestBindAddress();
+		this.restBindPort = configuration.getRestBindPort();
 		this.sslEngine = configuration.getSslEngine();
 
 		this.uploadDir = configuration.getUploadDir();
@@ -101,8 +104,6 @@ public abstract class RestServerEndpoint {
 		this.responseHeaders = configuration.getResponseHeaders();
 
 		terminationFuture = new CompletableFuture<>();
-
-		this.restAddress = null;
 	}
 
 	/**
@@ -176,18 +177,23 @@ public abstract class RestServerEndpoint {
 				.childHandler(initializer);
 
 			final ChannelFuture channel;
-			if (configuredAddress == null) {
-				channel = bootstrap.bind(configuredPort);
+			if (restBindAddress == null) {
+				channel = bootstrap.bind(restBindPort);
 			} else {
-				channel = bootstrap.bind(configuredAddress, configuredPort);
+				channel = bootstrap.bind(restBindAddress, restBindPort);
 			}
 			serverChannel = channel.syncUninterruptibly().channel();
 
-			InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
-			String address = bindAddress.getAddress().getHostAddress();
-			int port = bindAddress.getPort();
+			final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
+			final String advertisedAddress;
+			if (bindAddress.getAddress().isAnyLocalAddress()) {
+				advertisedAddress = this.restAddress;
+			} else {
+				advertisedAddress = bindAddress.getAddress().getHostAddress();
+			}
+			final int port = bindAddress.getPort();
 
-			log.info("Rest endpoint listening at {}:{}", address, port);
+			log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
 
 			final String protocol;
 
@@ -197,9 +203,9 @@ public abstract class RestServerEndpoint {
 				protocol = "http://";
 			}
 
-			restAddress = protocol + address + ':' + port;
+			restBaseUrl = protocol + advertisedAddress + ':' + port;
 
-			restAddressFuture.complete(restAddress);
+			restAddressFuture.complete(restBaseUrl);
 
 			state = State.RUNNING;
 
@@ -238,14 +244,14 @@ public abstract class RestServerEndpoint {
 	}
 
 	/**
-	 * Returns the address of the REST server endpoint.
+	 * Returns the base URL of the REST server endpoint.
 	 *
-	 * @return REST address of this endpoint
+	 * @return REST base URL of this endpoint
 	 */
-	public String getRestAddress() {
+	public String getRestBaseUrl() {
 		synchronized (lock) {
 			Preconditions.checkState(state != State.CREATED, "The RestServerEndpoint has not been started yet.");
-			return restAddress;
+			return restBaseUrl;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 35bd6ea..1fac08e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -45,6 +45,8 @@ import static java.util.Objects.requireNonNull;
  */
 public final class RestServerEndpointConfiguration {
 
+	private final String restAddress;
+
 	@Nullable
 	private final String restBindAddress;
 
@@ -60,6 +62,7 @@ public final class RestServerEndpointConfiguration {
 	private final Map<String, String> responseHeaders;
 
 	private RestServerEndpointConfiguration(
+			final String restAddress,
 			@Nullable String restBindAddress,
 			int restBindPort,
 			@Nullable SSLEngine sslEngine,
@@ -69,12 +72,20 @@ public final class RestServerEndpointConfiguration {
 		Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
 		Preconditions.checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
 
+		this.restAddress = requireNonNull(restAddress);
 		this.restBindAddress = restBindAddress;
 		this.restBindPort = restBindPort;
 		this.sslEngine = sslEngine;
 		this.uploadDir = requireNonNull(uploadDir);
 		this.maxContentLength = maxContentLength;
-		this.responseHeaders = requireNonNull(Collections.unmodifiableMap(responseHeaders));
+		this.responseHeaders = Collections.unmodifiableMap(requireNonNull(responseHeaders));
+	}
+
+	/**
+	 * @see RestOptions#REST_ADDRESS
+	 */
+	public String getRestAddress() {
+		return restAddress;
 	}
 
 	/**
@@ -82,7 +93,7 @@ public final class RestServerEndpointConfiguration {
 	 *
 	 * @return address that the REST server endpoint should bind itself to
 	 */
-	public String getEndpointBindAddress() {
+	public String getRestBindAddress() {
 		return restBindAddress;
 	}
 
@@ -91,7 +102,7 @@ public final class RestServerEndpointConfiguration {
 	 *
 	 * @return port that the REST server endpoint should listen on
 	 */
-	public int getEndpointBindPort() {
+	public int getRestBindPort() {
 		return restBindPort;
 	}
 
@@ -136,12 +147,16 @@ public final class RestServerEndpointConfiguration {
 	 */
 	public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
 		Preconditions.checkNotNull(config);
-		String address = config.getString(RestOptions.REST_ADDRESS);
 
-		int port = config.getInteger(RestOptions.REST_PORT);
+		final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.REST_ADDRESS),
+			"%s must be set",
+			RestOptions.REST_ADDRESS.key());
+
+		final String restBindAddress = config.getString(RestOptions.REST_BIND_ADDRESS);
+		final int port = config.getInteger(RestOptions.REST_PORT);
 
 		SSLEngine sslEngine = null;
-		boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+		final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
 		if (enableSSL) {
 			try {
 				SSLContext sslContext = SSLUtils.createSSLServerContext(config);
@@ -159,14 +174,15 @@ public final class RestServerEndpointConfiguration {
 			config.getString(WebOptions.UPLOAD_DIR,	config.getString(WebOptions.TMP_DIR)),
 			"flink-web-upload-" + UUID.randomUUID());
 
-		int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
+		final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
 
 		final Map<String, String> responseHeaders = Collections.singletonMap(
 			HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
 			config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
 
 		return new RestServerEndpointConfiguration(
-			address,
+			restAddress,
+			restBindAddress,
 			port,
 			sslEngine,
 			uploadDir,

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index dfb2fc8..50ad7eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -666,18 +666,18 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 
 	@Override
 	public void grantLeadership(final UUID leaderSessionID) {
-		log.info("{} was granted leadership with leaderSessionID={}", getRestAddress(), leaderSessionID);
+		log.info("{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID);
 		leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 	}
 
 	@Override
 	public void revokeLeadership() {
-		log.info("{} lost leadership", getRestAddress());
+		log.info("{} lost leadership", getRestBaseUrl());
 	}
 
 	@Override
 	public String getAddress() {
-		return getRestAddress();
+		return getRestBaseUrl();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 32f3ec8..784c141 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -115,6 +115,7 @@ public class RestServerEndpointITCase extends TestLogger {
 	public void setup() throws Exception {
 		Configuration config = new Configuration();
 		config.setInteger(RestOptions.REST_PORT, 0);
+		config.setString(RestOptions.REST_ADDRESS, "localhost");
 		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
 		config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
 		config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
@@ -335,7 +336,7 @@ public class RestServerEndpointITCase extends TestLogger {
 
 	private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
 		final HttpURLConnection connection =
-			(HttpURLConnection) new URL(serverEndpoint.getRestAddress() + "/upload").openConnection();
+			(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
 		connection.setDoOutput(true);
 		connection.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary);
 		return connection;