You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2018/06/09 13:43:57 UTC
incubator-gearpump git commit: [GEARPUMP-379] Send terminated status to client when an application i…
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 504bcf39c -> 2334c19ee
[GEARPUMP-379] Send terminated status to client when an application i…
…s terminated
Author: manuzhang <ow...@gmail.com>
Closes #249 from manuzhang/gearpump_379.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2334c19e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2334c19e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2334c19e
Branch: refs/heads/master
Commit: 2334c19ee4309e8c3e4814297ef466b0b73758a3
Parents: 504bcf3
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jun 9 21:42:29 2018 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jun 9 21:42:41 2018 +0800
----------------------------------------------------------------------
.../apache/gearpump/cluster/ClusterMessage.scala | 6 +++++-
.../cluster/client/RunningApplication.scala | 16 +++++++++++-----
.../gearpump/cluster/master/AppManager.scala | 17 +++++++++++------
3 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
index 8a067b5..eb7755c 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -159,11 +159,15 @@ object MasterToClient {
/** Return the last error of this streaming application job */
case class LastFailure(time: MilliSeconds, error: String)
- sealed trait ApplicationResult
+ sealed trait ApplicationResult {
+ def appId: Int
+ }
case class ApplicationSucceeded(appId: Int) extends ApplicationResult
case class ApplicationFailed(appId: Int, error: Throwable) extends ApplicationResult
+
+ case class ApplicationTerminated(appId: Int) extends ApplicationResult
}
object AppMasterToMaster {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
index 103df01..7c9db9a 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
@@ -56,11 +56,17 @@ class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) {
def waitUntilFinish(duration: Duration): Unit = {
val result = ActorUtil.askActor[ApplicationResult](master,
RegisterAppResultListener(appId), new Timeout(duration.getSeconds, TimeUnit.SECONDS))
- result match {
- case failed: ApplicationFailed =>
- throw failed.error
- case _ =>
- LOG.info(s"Application $appId succeeded")
+ if (result.appId == appId) {
+ result match {
+ case failed: ApplicationFailed =>
+ throw failed.error
+ case _: ApplicationSucceeded =>
+ LOG.info(s"Application $appId succeeded")
+ case _: ApplicationTerminated =>
+ LOG.info(s"Application $appId terminated")
+ }
+ } else {
+ LOG.warn(s"Received unexpected result $result for application $appId")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2334c19e/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 0a42e3a..0a32d9d 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -241,17 +241,14 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
case succeeded@ApplicationStatus.SUCCEEDED =>
killAppMaster(appId, appRuntimeInfo.worker)
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
- appResultListeners.getOrElse(appId, List.empty).foreach { client =>
- client ! ApplicationSucceeded(appId)
- }
+ sendAppResultToListeners(appId, ApplicationSucceeded(appId))
case failed@ApplicationStatus.FAILED =>
killAppMaster(appId, appRuntimeInfo.worker)
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
- appResultListeners.getOrElse(appId, List.empty).foreach { client =>
- client ! ApplicationFailed(appId, error)
- }
+ sendAppResultToListeners(appId, ApplicationFailed(appId, error))
case terminated@ApplicationStatus.TERMINATED =>
updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
+ sendAppResultToListeners(appId, ApplicationTerminated(appId))
case status =>
LOG.error(s"App $appId should not change it's status to $status")
}
@@ -271,6 +268,14 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
}
}
+ private def sendAppResultToListeners(appId: Int, result: ApplicationResult): Unit = {
+ appResultListeners.get(appId).foreach {
+ _.foreach { client =>
+ client ! result
+ }
+ }
+ }
+
def appDataStoreService: Receive = {
case SaveAppData(appId, key, value) =>
val client = sender()