You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/16 19:18:47 UTC
[06/11] flink git commit: [FLINK-9156][REST][CLI] Update --jobmanager
option logic for REST client
[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client
This closes #5838.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47909f46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47909f46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47909f46
Branch: refs/heads/release-1.5
Commit: 47909f466b9c9ee1f4caf94e9f6862a21b628817
Parents: 50504ce
Author: zentol <ch...@apache.org>
Authored: Wed Apr 11 12:48:51 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Apr 16 21:18:32 2018 +0200
----------------------------------------------------------------------
.../apache/flink/client/cli/CliFrontend.java | 3 ++
.../client/program/rest/RestClusterClient.java | 3 +-
.../program/rest/RestClusterClientTest.java | 35 ++++++++++++++++++++
3 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index ce6556b..65f470b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -1141,6 +1142,8 @@ public class CliFrontend {
public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(JobManagerOptions.ADDRESS, address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
+ config.setString(RestOptions.REST_ADDRESS, address.getHostString());
+ config.setInteger(RestOptions.REST_PORT, address.getPort());
}
public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a6f676e..3d50e93 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -719,7 +719,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
.orElse(false);
}
- private CompletableFuture<URL> getWebMonitorBaseUrl() {
+ @VisibleForTesting
+ CompletableFuture<URL> getWebMonitorBaseUrl() {
return FutureUtils.orTimeout(
webMonitorLeaderRetriever.getLeaderFuture(),
restClusterClientConfiguration.getAwaitLeaderTimeout(),
http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e7f9bf9..e2daad6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
@@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.commons.cli.CommandLine;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -111,6 +114,7 @@ import org.mockito.MockitoAnnotations;
import javax.annotation.Nonnull;
import java.io.IOException;
+import java.net.URL;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
@@ -677,6 +681,37 @@ public class RestClusterClientTest extends TestLogger {
}
}
+ /**
+ * Tests that command line options override the configuration settings.
+ */
+ @Test
+ public void testRESTManualConfigurationOverride() throws Exception {
+ final String configuredHostname = "localhost";
+ final int configuredPort = 1234;
+ final Configuration configuration = new Configuration();
+
+ configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
+ configuration.setInteger(JobManagerOptions.PORT, configuredPort);
+ configuration.setString(RestOptions.REST_ADDRESS, configuredHostname);
+ configuration.setInteger(RestOptions.REST_PORT, configuredPort);
+
+ final DefaultCLI defaultCLI = new DefaultCLI(configuration);
+
+ final String manualHostname = "123.123.123.123";
+ final int manualPort = 4321;
+ final String[] args = {"-m", manualHostname + ':' + manualPort};
+
+ CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
+
+ final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+
+ final RestClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
+
+ URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
+ assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname));
+ assertThat(webMonitorBaseUrl.getPort(), equalTo(manualPort));
+ }
+
private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
public TestAccumulatorHandler() {