You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/03/18 17:54:38 UTC
[5/7] flink git commit: [FLINK-8843][REST] Decouple bind REST address
from advertised address
[FLINK-8843][REST] Decouple bind REST address from advertised address
By default bind REST server on wildcard address.
Rename RestServerEndpoint#getRestAddress to getRestBaseUrl.
This closes #5707.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efd7336f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efd7336f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efd7336f
Branch: refs/heads/master
Commit: efd7336fa693a9f82b9ecfb5d81c0ef747ab7801
Parents: 0caff35
Author: gyao <ga...@data-artisans.com>
Authored: Thu Mar 15 22:04:58 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Mar 18 15:58:14 2018 +0100
----------------------------------------------------------------------
.../apache/flink/configuration/RestOptions.java | 15 +++++--
.../runtime/entrypoint/ClusterEntrypoint.java | 4 +-
.../HighAvailabilityServicesUtils.java | 6 ++-
.../flink/runtime/minicluster/MiniCluster.java | 4 +-
.../minicluster/MiniClusterConfiguration.java | 4 ++
.../flink/runtime/rest/RestServerEndpoint.java | 46 +++++++++++---------
.../rest/RestServerEndpointConfiguration.java | 32 ++++++++++----
.../runtime/webmonitor/WebMonitorEndpoint.java | 6 +--
.../runtime/rest/RestServerEndpointITCase.java | 3 +-
9 files changed, 80 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 94d7977..e7421c4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -29,12 +29,21 @@ import static org.apache.flink.configuration.ConfigOptions.key;
public class RestOptions {
/**
- * The address that the server binds itself to / the client connects to.
+ * The address that the server binds itself to.
+ */
+ public static final ConfigOption<String> REST_BIND_ADDRESS =
+ key("rest.bind-address")
+ .noDefaultValue()
+ .withDescription("The address that the server binds itself.");
+
+ /**
+ * The address that should be used by clients to connect to the server.
*/
public static final ConfigOption<String> REST_ADDRESS =
key("rest.address")
- .defaultValue("localhost")
- .withDescription("The address that the server binds itself to / the client connects to.");
+ .noDefaultValue()
+ .withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
+ .withDescription("The address that should be used by clients to connect to the server.");
/**
* The port that the server listens on / the client connects to.
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 676415b..63c8072 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -330,7 +330,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
metricRegistry,
this,
clusterInformation,
- webMonitorEndpoint.getRestAddress());
+ webMonitorEndpoint.getRestBaseUrl());
jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
@@ -345,7 +345,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
metricRegistry.getMetricQueryServicePath(),
archivedExecutionGraphStore,
this,
- webMonitorEndpoint.getRestAddress());
+ webMonitorEndpoint.getRestBaseUrl());
LOG.debug("Starting ResourceManager.");
resourceManager.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 4f12f2b..f19a421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -39,6 +39,8 @@ import org.apache.flink.util.ConfigurationException;
import java.util.concurrent.Executor;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Utils class to instantiate {@link HighAvailabilityServices} implementations.
*/
@@ -97,7 +99,9 @@ public class HighAvailabilityServicesUtils {
addressResolution,
configuration);
- final String address = configuration.getString(RestOptions.REST_ADDRESS);
+ final String address = checkNotNull(configuration.getString(RestOptions.REST_ADDRESS),
+ "%s must be set",
+ RestOptions.REST_ADDRESS.key());
final int port = configuration.getInteger(RestOptions.REST_PORT);
final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED);
final String protocol = enableSSL ? "https://" : "http://";
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 74aa388..dfe30af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -341,7 +341,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
dispatcherRestEndpoint.start();
- restAddressURI = new URI(dispatcherRestEndpoint.getRestAddress());
+ restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());
// bring up the dispatcher that launches JobManagers when jobs submitted
LOG.info("Starting job dispatcher(s) for JobManger");
@@ -361,7 +361,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
new MemoryArchivedExecutionGraphStore(),
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
new ShutDownFatalErrorHandler(),
- dispatcherRestEndpoint.getRestAddress());
+ dispatcherRestEndpoint.getRestBaseUrl());
dispatcher.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 08af0c4..fe76694 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.Preconditions;
@@ -167,6 +168,9 @@ public class MiniClusterConfiguration {
public MiniClusterConfiguration build() {
final Configuration modifiedConfiguration = new Configuration(configuration);
modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+ modifiedConfiguration.setString(
+ RestOptions.REST_ADDRESS,
+ modifiedConfiguration.getString(RestOptions.REST_ADDRESS, "localhost"));
return new MiniClusterConfiguration(
modifiedConfiguration,
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index a3d4843..dfb01ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -73,8 +73,9 @@ public abstract class RestServerEndpoint {
private final Object lock = new Object();
- private final String configuredAddress;
- private final int configuredPort;
+ private final String restAddress;
+ private final String restBindAddress;
+ private final int restBindPort;
private final SSLEngine sslEngine;
private final Path uploadDir;
private final int maxContentLength;
@@ -84,14 +85,16 @@ public abstract class RestServerEndpoint {
private ServerBootstrap bootstrap;
private Channel serverChannel;
- private String restAddress;
+ private String restBaseUrl;
private State state = State.CREATED;
public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException {
Preconditions.checkNotNull(configuration);
- this.configuredAddress = configuration.getEndpointBindAddress();
- this.configuredPort = configuration.getEndpointBindPort();
+
+ this.restAddress = configuration.getRestAddress();
+ this.restBindAddress = configuration.getRestBindAddress();
+ this.restBindPort = configuration.getRestBindPort();
this.sslEngine = configuration.getSslEngine();
this.uploadDir = configuration.getUploadDir();
@@ -101,8 +104,6 @@ public abstract class RestServerEndpoint {
this.responseHeaders = configuration.getResponseHeaders();
terminationFuture = new CompletableFuture<>();
-
- this.restAddress = null;
}
/**
@@ -176,18 +177,23 @@ public abstract class RestServerEndpoint {
.childHandler(initializer);
final ChannelFuture channel;
- if (configuredAddress == null) {
- channel = bootstrap.bind(configuredPort);
+ if (restBindAddress == null) {
+ channel = bootstrap.bind(restBindPort);
} else {
- channel = bootstrap.bind(configuredAddress, configuredPort);
+ channel = bootstrap.bind(restBindAddress, restBindPort);
}
serverChannel = channel.syncUninterruptibly().channel();
- InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
- String address = bindAddress.getAddress().getHostAddress();
- int port = bindAddress.getPort();
+ final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
+ final String advertisedAddress;
+ if (bindAddress.getAddress().isAnyLocalAddress()) {
+ advertisedAddress = this.restAddress;
+ } else {
+ advertisedAddress = bindAddress.getAddress().getHostAddress();
+ }
+ final int port = bindAddress.getPort();
- log.info("Rest endpoint listening at {}:{}", address, port);
+ log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
final String protocol;
@@ -197,9 +203,9 @@ public abstract class RestServerEndpoint {
protocol = "http://";
}
- restAddress = protocol + address + ':' + port;
+ restBaseUrl = protocol + advertisedAddress + ':' + port;
- restAddressFuture.complete(restAddress);
+ restAddressFuture.complete(restBaseUrl);
state = State.RUNNING;
@@ -238,14 +244,14 @@ public abstract class RestServerEndpoint {
}
/**
- * Returns the address of the REST server endpoint.
+ * Returns the base URL of the REST server endpoint.
*
- * @return REST address of this endpoint
+ * @return REST base URL of this endpoint
*/
- public String getRestAddress() {
+ public String getRestBaseUrl() {
synchronized (lock) {
Preconditions.checkState(state != State.CREATED, "The RestServerEndpoint has not been started yet.");
- return restAddress;
+ return restBaseUrl;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 35bd6ea..1fac08e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -45,6 +45,8 @@ import static java.util.Objects.requireNonNull;
*/
public final class RestServerEndpointConfiguration {
+ private final String restAddress;
+
@Nullable
private final String restBindAddress;
@@ -60,6 +62,7 @@ public final class RestServerEndpointConfiguration {
private final Map<String, String> responseHeaders;
private RestServerEndpointConfiguration(
+ final String restAddress,
@Nullable String restBindAddress,
int restBindPort,
@Nullable SSLEngine sslEngine,
@@ -69,12 +72,20 @@ public final class RestServerEndpointConfiguration {
Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
Preconditions.checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
+ this.restAddress = requireNonNull(restAddress);
this.restBindAddress = restBindAddress;
this.restBindPort = restBindPort;
this.sslEngine = sslEngine;
this.uploadDir = requireNonNull(uploadDir);
this.maxContentLength = maxContentLength;
- this.responseHeaders = requireNonNull(Collections.unmodifiableMap(responseHeaders));
+ this.responseHeaders = Collections.unmodifiableMap(requireNonNull(responseHeaders));
+ }
+
+ /**
+ * @see RestOptions#REST_ADDRESS
+ */
+ public String getRestAddress() {
+ return restAddress;
}
/**
@@ -82,7 +93,7 @@ public final class RestServerEndpointConfiguration {
*
* @return address that the REST server endpoint should bind itself to
*/
- public String getEndpointBindAddress() {
+ public String getRestBindAddress() {
return restBindAddress;
}
@@ -91,7 +102,7 @@ public final class RestServerEndpointConfiguration {
*
* @return port that the REST server endpoint should listen on
*/
- public int getEndpointBindPort() {
+ public int getRestBindPort() {
return restBindPort;
}
@@ -136,12 +147,16 @@ public final class RestServerEndpointConfiguration {
*/
public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
Preconditions.checkNotNull(config);
- String address = config.getString(RestOptions.REST_ADDRESS);
- int port = config.getInteger(RestOptions.REST_PORT);
+ final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.REST_ADDRESS),
+ "%s must be set",
+ RestOptions.REST_ADDRESS.key());
+
+ final String restBindAddress = config.getString(RestOptions.REST_BIND_ADDRESS);
+ final int port = config.getInteger(RestOptions.REST_PORT);
SSLEngine sslEngine = null;
- boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
+ final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
if (enableSSL) {
try {
SSLContext sslContext = SSLUtils.createSSLServerContext(config);
@@ -159,14 +174,15 @@ public final class RestServerEndpointConfiguration {
config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)),
"flink-web-upload-" + UUID.randomUUID());
- int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
+ final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
final Map<String, String> responseHeaders = Collections.singletonMap(
HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
return new RestServerEndpointConfiguration(
- address,
+ restAddress,
+ restBindAddress,
port,
sslEngine,
uploadDir,
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index dfb2fc8..50ad7eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -666,18 +666,18 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
@Override
public void grantLeadership(final UUID leaderSessionID) {
- log.info("{} was granted leadership with leaderSessionID={}", getRestAddress(), leaderSessionID);
+ log.info("{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID);
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
}
@Override
public void revokeLeadership() {
- log.info("{} lost leadership", getRestAddress());
+ log.info("{} lost leadership", getRestBaseUrl());
}
@Override
public String getAddress() {
- return getRestAddress();
+ return getRestBaseUrl();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 32f3ec8..784c141 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -115,6 +115,7 @@ public class RestServerEndpointITCase extends TestLogger {
public void setup() throws Exception {
Configuration config = new Configuration();
config.setInteger(RestOptions.REST_PORT, 0);
+ config.setString(RestOptions.REST_ADDRESS, "localhost");
config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
@@ -335,7 +336,7 @@ public class RestServerEndpointITCase extends TestLogger {
private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
final HttpURLConnection connection =
- (HttpURLConnection) new URL(serverEndpoint.getRestAddress() + "/upload").openConnection();
+ (HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary);
return connection;