You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:47 UTC

[06/11] flink git commit: [FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

This closes #5838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47909f46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47909f46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47909f46

Branch: refs/heads/release-1.5
Commit: 47909f466b9c9ee1f4caf94e9f6862a21b628817
Parents: 50504ce
Author: zentol <ch...@apache.org>
Authored: Wed Apr 11 12:48:51 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |  3 ++
 .../client/program/rest/RestClusterClient.java  |  3 +-
 .../program/rest/RestClusterClientTest.java     | 35 ++++++++++++++++++++
 3 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index ce6556b..65f470b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -1141,6 +1142,8 @@ public class CliFrontend {
 	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
+		config.setString(RestOptions.REST_ADDRESS, address.getHostString());
+		config.setInteger(RestOptions.REST_PORT, address.getPort());
 	}
 
 	public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a6f676e..3d50e93 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -719,7 +719,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 				.orElse(false);
 	}
 
-	private CompletableFuture<URL> getWebMonitorBaseUrl() {
+	@VisibleForTesting
+	CompletableFuture<URL> getWebMonitorBaseUrl() {
 		return FutureUtils.orTimeout(
 				webMonitorLeaderRetriever.getLeaderFuture(),
 				restClusterClientConfiguration.getAwaitLeaderTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e7f9bf9..e2daad6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.apache.commons.cli.CommandLine;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -111,6 +114,7 @@ import org.mockito.MockitoAnnotations;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -677,6 +681,37 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that command line options override the configuration settings.
+	 */
+	@Test
+	public void testRESTManualConfigurationOverride() throws Exception {
+		final String configuredHostname = "localhost";
+		final int configuredPort = 1234;
+		final Configuration configuration = new Configuration();
+
+		configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
+		configuration.setInteger(JobManagerOptions.PORT, configuredPort);
+		configuration.setString(RestOptions.REST_ADDRESS, configuredHostname);
+		configuration.setInteger(RestOptions.REST_PORT, configuredPort);
+
+		final DefaultCLI defaultCLI = new DefaultCLI(configuration);
+
+		final String manualHostname = "123.123.123.123";
+		final int manualPort = 4321;
+		final String[] args = {"-m", manualHostname + ':' + manualPort};
+
+		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+
+		final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+
+		final RestClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+
+		URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
+		assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname));
+		assertThat(webMonitorBaseUrl.getPort(), equalTo(manualPort));
+	}
+
 	private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
 
 		public TestAccumulatorHandler() {