You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/29 16:16:57 UTC
[3/4] flink git commit: [FLINK-4526][yarn] remove redundant proxy
messages
[FLINK-4526][yarn] remove redundant proxy messages
This closes #2437
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31c88a56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31c88a56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31c88a56
Branch: refs/heads/master
Commit: 31c88a564a8eda263aedada0da671357d1a6f524
Parents: ab1df63
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Aug 30 10:46:22 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Aug 29 18:11:07 2016 +0200
----------------------------------------------------------------------
.../flink/client/program/ClusterClient.java | 3 +-
.../apache/flink/yarn/YarnClusterClient.java | 41 +++++++-------------
.../apache/flink/yarn/ApplicationClient.scala | 27 -------------
.../org/apache/flink/yarn/YarnMessages.scala | 7 ----
4 files changed, 15 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 768ab18..1ef8fe1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -671,8 +671,7 @@ public abstract class ClusterClient {
* @throws Exception
*/
public ActorGateway getJobManagerGateway() throws Exception {
- LOG.info("Looking up JobManager");
-
+ LOG.debug("Looking up JobManager");
return LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
actorSystemLoader.get(),
http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 75bfeed..8b6cd9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -19,8 +19,6 @@ package org.apache.flink.yarn;
import akka.actor.ActorRef;
-import static akka.pattern.Patterns.ask;
-
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
@@ -30,8 +28,10 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -48,9 +48,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.None$;
import scala.Option;
-import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -83,7 +81,6 @@ public class YarnClusterClient extends ClusterClient {
private final AbstractYarnClusterDescriptor clusterDescriptor;
private final LazApplicationClientLoader applicationClient;
private final FiniteDuration akkaDuration;
- private final Timeout akkaTimeout;
private final ApplicationReport appReport;
private final ApplicationId appId;
private final String trackingURL;
@@ -116,7 +113,6 @@ public class YarnClusterClient extends ClusterClient {
super(flinkConfig);
this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
- this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
this.clusterDescriptor = clusterDescriptor;
this.yarnClient = yarnClient;
this.hadoopConfig = yarnClient.getConfig();
@@ -175,12 +171,12 @@ public class YarnClusterClient extends ClusterClient {
*/
private void stopAfterJob(JobID jobID) {
Preconditions.checkNotNull(jobID, "The job id must not be null");
- Future<Object> messageReceived =
- ask(
- applicationClient.get(),
- new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
try {
- Await.result(messageReceived, akkaDuration);
+ Future<Object> replyFuture =
+ getJobManagerGateway().ask(
+ new ShutdownClusterAfterJob(jobID),
+ akkaDuration);
+ Await.ready(replyFuture, akkaDuration);
} catch (Exception e) {
throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e);
}
@@ -230,30 +226,21 @@ public class YarnClusterClient extends ClusterClient {
@Override
public GetClusterStatusResponse getClusterStatus() {
if(!isConnected) {
- throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
+ throw new IllegalStateException("The cluster is not connected to the cluster.");
}
if(hasBeenShutdown()) {
- return null;
+ throw new IllegalStateException("The cluster has already been shutdown.");
}
- Future<Object> clusterStatusOption =
- ask(
- applicationClient.get(),
- YarnMessages.getLocalGetyarnClusterStatus(),
- akkaTimeout);
- Object clusterStatus;
try {
- clusterStatus = Await.result(clusterStatusOption, akkaDuration);
+ final Future<Object> clusterStatusOption =
+ getJobManagerGateway().ask(
+ GetClusterStatus.getInstance(),
+ akkaDuration);
+ return (GetClusterStatusResponse) Await.result(clusterStatusOption, akkaDuration);
} catch (Exception e) {
throw new RuntimeException("Unable to get ClusterClient status from Application Client", e);
}
- if(clusterStatus instanceof None$) {
- throw new RuntimeException("Unable to get ClusterClient status from Application Client");
- } else if(clusterStatus instanceof Some) {
- return (GetClusterStatusResponse) (((Some) clusterStatus).get());
- } else {
- throw new RuntimeException("Unexpected type: " + clusterStatus.getClass().getCanonicalName());
- }
}
public ApplicationStatus getApplicationStatus() {
http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index e701269..7442503 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -48,7 +48,6 @@ class ApplicationClient(
with LeaderSessionMessageFilter
with LogMessages
with LeaderRetrievalListener{
- import context._
val log = Logger(getClass)
@@ -60,7 +59,6 @@ class ApplicationClient(
var pollingTimer: Option[Cancellable] = None
var running = false
var messagesQueue : mutable.Queue[InfoMessage] = mutable.Queue[InfoMessage]()
- var latestClusterStatus : Option[GetClusterStatusResponse] = None
var stopMessageReceiver : Option[ActorRef] = None
var leaderSessionID: Option[UUID] = None
@@ -136,19 +134,8 @@ class ApplicationClient(
// The job manager acts as a proxy between the client and the resource managert
val jm = sender()
log.info(s"Successfully registered at the ResourceManager using JobManager $jm")
-
yarnJobManager = Some(jm)
- // schedule a periodic status report from the JobManager
- // request the number of task managers and slots from the job manager
- pollingTimer = Some(
- context.system.scheduler.schedule(
- INITIAL_POLLING_DELAY,
- WAIT_FOR_YARN_INTERVAL,
- yarnJobManager.get,
- decorateMessage(GetClusterStatus.getInstance()))
- )
-
case JobManagerLeaderAddress(jobManagerAkkaURL, newLeaderSessionID) =>
log.info(s"Received address of new leader $jobManagerAkkaURL with session ID" +
s" $newLeaderSessionID.")
@@ -192,20 +179,6 @@ class ApplicationClient(
}(context.dispatcher)
}
- // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr
- case status: GetClusterStatusResponse =>
- latestClusterStatus = Some(status)
-
- // locally get cluster status
- case LocalGetYarnClusterStatus =>
- sender() ! decorateMessage(latestClusterStatus)
-
- // Forward message to Application Master
- case LocalStopAMAfterJob(jobID) =>
- yarnJobManager foreach {
- _ forward decorateMessage(new ShutdownClusterAfterJob(jobID))
- }
-
// ----------------- handle messages from the cluster -------------------
// receive remote messages
case msg: InfoMessage =>
http://git-wip-us.apache.org/repos/asf/flink/blob/31c88a56/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
index da1917b..ada2631 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
@@ -86,17 +86,10 @@ object YarnMessages {
case object HeartbeatWithYarn
case object CheckForUserCommand
- // tell the AM to monitor the job and stop once it has finished
- case class LocalStopAMAfterJob(jobId:JobID)
-
case object LocalGetYarnMessage // request new message
- case object LocalGetYarnClusterStatus // request the latest cluster status
def getLocalGetYarnMessage(): AnyRef = {
LocalGetYarnMessage
}
- def getLocalGetyarnClusterStatus(): AnyRef = {
- LocalGetYarnClusterStatus
- }
}