You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/29 16:16:57 UTC

[3/4] flink git commit: [FLINK-4526][yarn] remove redundant proxy messages

[FLINK-4526][yarn] remove redundant proxy messages

This closes #2437


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31c88a56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31c88a56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31c88a56

Branch: refs/heads/master
Commit: 31c88a564a8eda263aedada0da671357d1a6f524
Parents: ab1df63
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Aug 30 10:46:22 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Aug 29 18:11:07 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  3 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 41 +++++++-------------
 .../apache/flink/yarn/ApplicationClient.scala   | 27 -------------
 .../org/apache/flink/yarn/YarnMessages.scala    |  7 ----
 4 files changed, 15 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 768ab18..1ef8fe1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -671,8 +671,7 @@ public abstract class ClusterClient {
 	 * @throws Exception
 	 */
 	public ActorGateway getJobManagerGateway() throws Exception {
-		LOG.info("Looking up JobManager");
-
+		LOG.debug("Looking up JobManager");
 		return LeaderRetrievalUtils.retrieveLeaderGateway(
 			LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
 			actorSystemLoader.get(),

http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 75bfeed..8b6cd9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -19,8 +19,6 @@ package org.apache.flink.yarn;
 
 import akka.actor.ActorRef;
 
-import static akka.pattern.Patterns.ask;
-
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
@@ -30,8 +28,10 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -48,9 +48,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.None$;
 import scala.Option;
-import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -83,7 +81,6 @@ public class YarnClusterClient extends ClusterClient {
 	private final AbstractYarnClusterDescriptor clusterDescriptor;
 	private final LazApplicationClientLoader applicationClient;
 	private final FiniteDuration akkaDuration;
-	private final Timeout akkaTimeout;
 	private final ApplicationReport appReport;
 	private final ApplicationId appId;
 	private final String trackingURL;
@@ -116,7 +113,6 @@ public class YarnClusterClient extends ClusterClient {
 		super(flinkConfig);
 
 		this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
-		this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
 		this.clusterDescriptor = clusterDescriptor;
 		this.yarnClient = yarnClient;
 		this.hadoopConfig = yarnClient.getConfig();
@@ -175,12 +171,12 @@ public class YarnClusterClient extends ClusterClient {
 	 */
 	private void stopAfterJob(JobID jobID) {
 		Preconditions.checkNotNull(jobID, "The job id must not be null");
-		Future<Object> messageReceived =
-			ask(
-				applicationClient.get(),
-				new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
 		try {
-			Await.result(messageReceived, akkaDuration);
+			Future<Object> replyFuture =
+				getJobManagerGateway().ask(
+					new ShutdownClusterAfterJob(jobID),
+					akkaDuration);
+			Await.ready(replyFuture, akkaDuration);
 		} catch (Exception e) {
 			throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e);
 		}
@@ -230,30 +226,21 @@ public class YarnClusterClient extends ClusterClient {
 	@Override
 	public GetClusterStatusResponse getClusterStatus() {
 		if(!isConnected) {
-			throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
+			throw new IllegalStateException("The cluster is not connected to the cluster.");
 		}
 		if(hasBeenShutdown()) {
-			return null;
+			throw new IllegalStateException("The cluster has already been shutdown.");
 		}
 
-		Future<Object> clusterStatusOption =
-			ask(
-				applicationClient.get(),
-				YarnMessages.getLocalGetyarnClusterStatus(),
-				akkaTimeout);
-		Object clusterStatus;
 		try {
-			clusterStatus = Await.result(clusterStatusOption, akkaDuration);
+			final Future<Object> clusterStatusOption =
+				getJobManagerGateway().ask(
+					GetClusterStatus.getInstance(),
+					akkaDuration);
+			return (GetClusterStatusResponse) Await.result(clusterStatusOption, akkaDuration);
 		} catch (Exception e) {
 			throw new RuntimeException("Unable to get ClusterClient status from Application Client", e);
 		}
-		if(clusterStatus instanceof None$) {
-			throw new RuntimeException("Unable to get ClusterClient status from Application Client");
-		} else if(clusterStatus instanceof Some) {
-			return (GetClusterStatusResponse) (((Some) clusterStatus).get());
-		} else {
-			throw new RuntimeException("Unexpected type: " + clusterStatus.getClass().getCanonicalName());
-		}
 	}
 
 	public ApplicationStatus getApplicationStatus() {

http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index e701269..7442503 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -48,7 +48,6 @@ class ApplicationClient(
   with LeaderSessionMessageFilter
   with LogMessages
   with LeaderRetrievalListener{
-  import context._
 
   val log = Logger(getClass)
 
@@ -60,7 +59,6 @@ class ApplicationClient(
   var pollingTimer: Option[Cancellable] = None
   var running = false
   var messagesQueue : mutable.Queue[InfoMessage] = mutable.Queue[InfoMessage]()
-  var latestClusterStatus : Option[GetClusterStatusResponse] = None
   var stopMessageReceiver : Option[ActorRef] = None
 
   var leaderSessionID: Option[UUID] = None
@@ -136,19 +134,8 @@ class ApplicationClient(
       // The job manager acts as a proxy between the client and the resource managert
       val jm = sender()
       log.info(s"Successfully registered at the ResourceManager using JobManager $jm")
-
       yarnJobManager = Some(jm)
 
-      // schedule a periodic status report from the JobManager
-      // request the number of task managers and slots from the job manager
-      pollingTimer = Some(
-        context.system.scheduler.schedule(
-          INITIAL_POLLING_DELAY,
-          WAIT_FOR_YARN_INTERVAL,
-          yarnJobManager.get,
-          decorateMessage(GetClusterStatus.getInstance()))
-      )
-
     case JobManagerLeaderAddress(jobManagerAkkaURL, newLeaderSessionID) =>
       log.info(s"Received address of new leader $jobManagerAkkaURL with session ID" +
         s" $newLeaderSessionID.")
@@ -192,20 +179,6 @@ class ApplicationClient(
           }(context.dispatcher)
       }
 
-    // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr
-    case status: GetClusterStatusResponse =>
-      latestClusterStatus = Some(status)
-
-    // locally get cluster status
-    case LocalGetYarnClusterStatus =>
-      sender() ! decorateMessage(latestClusterStatus)
-
-    // Forward message to Application Master
-    case LocalStopAMAfterJob(jobID) =>
-      yarnJobManager foreach {
-        _ forward decorateMessage(new ShutdownClusterAfterJob(jobID))
-      }
-
     // -----------------  handle messages from the cluster -------------------
     // receive remote messages
     case msg: InfoMessage =>

http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
index da1917b..ada2631 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
@@ -86,17 +86,10 @@ object YarnMessages {
   case object HeartbeatWithYarn
   case object CheckForUserCommand
 
-  // tell the AM to monitor the job and stop once it has finished
-  case class LocalStopAMAfterJob(jobId:JobID)
-
   case object LocalGetYarnMessage // request new message
-  case object LocalGetYarnClusterStatus // request the latest cluster status
 
   def getLocalGetYarnMessage(): AnyRef = {
     LocalGetYarnMessage
   }
 
-  def getLocalGetyarnClusterStatus(): AnyRef = {
-    LocalGetYarnClusterStatus
-  }
 }