You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:48:55 UTC

[52/89] [abbrv] flink git commit: [FLINK-4454] always display JobManager address using LeaderRetrievalService

[FLINK-4454] always display JobManager address using LeaderRetrievalService

This closes #2406


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72064558
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72064558
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72064558

Branch: refs/heads/flip-6
Commit: 720645587bc58a22db6a8d948f91384da2ecb7b7
Parents: 844c874
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Aug 22 18:11:45 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Aug 24 11:29:30 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  4 ++--
 .../flink/client/program/ClusterClient.java     | 23 +++++---------------
 .../client/program/StandaloneClusterClient.java |  4 ++--
 3 files changed, 10 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 15e1362..c90bc29 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -845,7 +845,7 @@ public class CliFrontend {
 		CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
 		try {
 			ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
-			logAndSysout("Using address " + client.getJobManagerAddressFromConfig() + " to connect to JobManager.");
+			logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
 			return client;
 		} catch (Exception e) {
 			LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
@@ -896,7 +896,7 @@ public class CliFrontend {
 		}
 
 		// Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program.
-		final InetSocketAddress jobManagerAddress = client.getJobManagerAddressFromConfig();
+		final InetSocketAddress jobManagerAddress = client.getJobManagerAddress();
 		logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager.");
 		logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
 		return client;

http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 2e6a9cc..c3c666b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
@@ -57,6 +56,7 @@ import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
@@ -232,27 +232,16 @@ public abstract class ClusterClient {
 	}
 
 	/**
-	 * Gets the current JobManager address from the Flink configuration (may change in case of a HA setup).
-	 * @return The address (host and port) of the leading JobManager when it was last retrieved (may be outdated)
-	 */
-	public InetSocketAddress getJobManagerAddressFromConfig() {
-		try {
-			String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-			int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-			return new InetSocketAddress(hostName, port);
-		} catch (Exception e) {
-			throw new RuntimeException("Failed to retrieve JobManager address", e);
-		}
-	}
-
-	/**
 	 * Gets the current JobManager address (may change in case of a HA setup).
 	 * @return The address (host and port) of the leading JobManager
 	 */
 	public InetSocketAddress getJobManagerAddress() {
 		try {
-			final ActorRef jmActor = getJobManagerGateway().actor();
-			return AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat());
+			LeaderConnectionInfo leaderConnectionInfo =
+				LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+					LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), timeout);
+
+			return AkkaUtils.getInetSockeAddressFromAkkaURL(leaderConnectionInfo.getAddress());
 		} catch (Exception e) {
 			throw new RuntimeException("Failed to retrieve JobManager address", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/72064558/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 2c6e101..d25c9d1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient {
 
 	@Override
 	public String getWebInterfaceURL() {
-		String host = this.getJobManagerAddressFromConfig().getHostString();
+		String host = this.getJobManagerAddress().getHostString();
 		int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
 			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 		return "http://" +  host + ":" + port;
@@ -75,7 +75,7 @@ public class StandaloneClusterClient extends ClusterClient {
 	@Override
 	public String getClusterIdentifier() {
 		// Avoid blocking here by getting the address from the config without resolving the address
-		return "Standalone cluster with JobManager at " + this.getJobManagerAddressFromConfig();
+		return "Standalone cluster with JobManager at " + this.getJobManagerAddress();
 	}
 
 	@Override