You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:48:55 UTC
[52/89] [abbrv] flink git commit: [FLINK-4454] always display
JobManager address using LeaderRetrievalService
[FLINK-4454] always display JobManager address using LeaderRetrievalService
This closes #2406
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72064558
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72064558
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72064558
Branch: refs/heads/flip-6
Commit: 720645587bc58a22db6a8d948f91384da2ecb7b7
Parents: 844c874
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Aug 22 18:11:45 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Aug 24 11:29:30 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 4 ++--
.../flink/client/program/ClusterClient.java | 23 +++++---------------
.../client/program/StandaloneClusterClient.java | 4 ++--
3 files changed, 10 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 15e1362..c90bc29 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -845,7 +845,7 @@ public class CliFrontend {
CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
try {
ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
- logAndSysout("Using address " + client.getJobManagerAddressFromConfig() + " to connect to JobManager.");
+ logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
return client;
} catch (Exception e) {
LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
@@ -896,7 +896,7 @@ public class CliFrontend {
}
// Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program.
- final InetSocketAddress jobManagerAddress = client.getJobManagerAddressFromConfig();
+ final InetSocketAddress jobManagerAddress = client.getJobManagerAddress();
logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager.");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
return client;
http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 2e6a9cc..c3c666b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import akka.actor.ActorRef;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
@@ -57,6 +56,7 @@ import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
@@ -232,27 +232,16 @@ public abstract class ClusterClient {
}
/**
- * Gets the current JobManager address from the Flink configuration (may change in case of a HA setup).
- * @return The address (host and port) of the leading JobManager when it was last retrieved (may be outdated)
- */
- public InetSocketAddress getJobManagerAddressFromConfig() {
- try {
- String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
- return new InetSocketAddress(hostName, port);
- } catch (Exception e) {
- throw new RuntimeException("Failed to retrieve JobManager address", e);
- }
- }
-
- /**
* Gets the current JobManager address (may change in case of a HA setup).
* @return The address (host and port) of the leading JobManager
*/
public InetSocketAddress getJobManagerAddress() {
try {
- final ActorRef jmActor = getJobManagerGateway().actor();
- return AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat());
+ LeaderConnectionInfo leaderConnectionInfo =
+ LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+ LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), timeout);
+
+ return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve JobManager address", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 2c6e101..d25c9d1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient {
@Override
public String getWebInterfaceURL() {
- String host = this.getJobManagerAddressFromConfig().getHostString();
+ String host = this.getJobManagerAddress().getHostString();
int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
return "http://" + host + ":" + port;
@@ -75,7 +75,7 @@ public class StandaloneClusterClient extends ClusterClient {
@Override
public String getClusterIdentifier() {
// Avoid blocking here by getting the address from the config without resolving the address
- return "Standalone cluster with JobManager at " + this.getJobManagerAddressFromConfig();
+ return "Standalone cluster with JobManager at " + this.getJobManagerAddress();
}
@Override