You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2018/06/09 13:43:57 UTC

incubator-gearpump git commit: [GEARPUMP-379] Send terminated status to client when an application i…

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 504bcf39c -> 2334c19ee


[GEARPUMP-379] Send terminated status to client when an application i…

…s terminated

Author: manuzhang <ow...@gmail.com>

Closes #249 from manuzhang/gearpump_379.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2334c19e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2334c19e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2334c19e

Branch: refs/heads/master
Commit: 2334c19ee4309e8c3e4814297ef466b0b73758a3
Parents: 504bcf3
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jun 9 21:42:29 2018 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jun 9 21:42:41 2018 +0800

----------------------------------------------------------------------
 .../apache/gearpump/cluster/ClusterMessage.scala   |  6 +++++-
 .../cluster/client/RunningApplication.scala        | 16 +++++++++++-----
 .../gearpump/cluster/master/AppManager.scala       | 17 +++++++++++------
 3 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
index 8a067b5..eb7755c 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -159,11 +159,15 @@ object MasterToClient {
   /** Return the last error of this streaming application job */
   case class LastFailure(time: MilliSeconds, error: String)
 
-  sealed trait ApplicationResult
+  sealed trait ApplicationResult {
+    def appId: Int
+  }
 
   case class ApplicationSucceeded(appId: Int) extends ApplicationResult
 
   case class ApplicationFailed(appId: Int, error: Throwable) extends ApplicationResult
+
+  case class ApplicationTerminated(appId: Int) extends ApplicationResult
 }
 
 object AppMasterToMaster {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
index 103df01..7c9db9a 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
@@ -56,11 +56,17 @@ class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) {
   def waitUntilFinish(duration: Duration): Unit = {
     val result = ActorUtil.askActor[ApplicationResult](master,
       RegisterAppResultListener(appId), new Timeout(duration.getSeconds, TimeUnit.SECONDS))
-    result match {
-      case failed: ApplicationFailed =>
-        throw failed.error
-      case _ =>
-        LOG.info(s"Application $appId succeeded")
+    if (result.appId == appId) {
+      result match {
+        case failed: ApplicationFailed =>
+          throw failed.error
+        case _: ApplicationSucceeded =>
+          LOG.info(s"Application $appId succeeded")
+        case _: ApplicationTerminated =>
+          LOG.info(s"Application $appId terminated")
+      }
+    } else {
+      LOG.warn(s"Received unexpected result $result for application $appId")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 0a42e3a..0a32d9d 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -241,17 +241,14 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
             case succeeded@ApplicationStatus.SUCCEEDED =>
               killAppMaster(appId, appRuntimeInfo.worker)
               updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
-              appResultListeners.getOrElse(appId, List.empty).foreach { client =>
-                client ! ApplicationSucceeded(appId)
-              }
+              sendAppResultToListeners(appId, ApplicationSucceeded(appId))
             case failed@ApplicationStatus.FAILED =>
               killAppMaster(appId, appRuntimeInfo.worker)
               updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
-              appResultListeners.getOrElse(appId, List.empty).foreach { client =>
-                client ! ApplicationFailed(appId, error)
-              }
+              sendAppResultToListeners(appId, ApplicationFailed(appId, error))
             case terminated@ApplicationStatus.TERMINATED =>
               updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
+              sendAppResultToListeners(appId, ApplicationTerminated(appId))
             case status =>
               LOG.error(s"App $appId should not change it's status to $status")
           }
@@ -271,6 +268,14 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
     }
   }
 
+  private def sendAppResultToListeners(appId: Int, result: ApplicationResult): Unit = {
+    appResultListeners.get(appId).foreach {
+      _.foreach { client =>
+        client ! result
+      }
+    }
+  }
+
   def appDataStoreService: Receive = {
     case SaveAppData(appId, key, value) =>
       val client = sender()