You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/10 07:58:58 UTC

incubator-gearpump git commit: [GEARPUMP-285] Fix false alarm of shutting down executor time out

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master aebc09a8d -> ae55efbdd


[GEARPUMP-285] Fix false alarm of shutting down executor time out

Author: huafengw <fv...@gmail.com>

Closes #169 from huafengw/timeout.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ae55efbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ae55efbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ae55efbd

Branch: refs/heads/master
Commit: ae55efbddaf176b87d20f26bc57fa17ff1cbe2a5
Parents: aebc09a
Author: huafengw <fv...@gmail.com>
Authored: Fri Mar 10 15:58:28 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Mar 10 15:58:36 2017 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/AppDescription.scala       | 24 ++++++--
 .../gearpump/cluster/master/AppManager.scala    | 59 +++++++++++---------
 .../cluster/ApplicationStatusSpec.scala         | 42 ++++++++++++++
 .../master/ApplicationMetaDataSpec.scala        | 37 ++++++++++++
 .../cluster/master/ApplicationStateSpec.scala   | 37 ------------
 5 files changed, 131 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
index c31f01f..0c46aca 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
@@ -144,15 +144,29 @@ case class ExecutorJVMConfig(
 sealed abstract class ApplicationStatus(val status: String)
   extends Serializable{
   override def toString: String = status
+
+  def canTransitTo(newStatus: ApplicationStatus): Boolean
+
 }
 
 sealed abstract class ApplicationTerminalStatus(override val status: String)
-  extends ApplicationStatus(status)
+  extends ApplicationStatus(status) {
+
+  override def canTransitTo(newStatus: ApplicationStatus): Boolean = false
+}
 
 object ApplicationStatus {
-  case object PENDING extends ApplicationStatus("pending")
+  case object PENDING extends ApplicationStatus("pending") {
+    override def canTransitTo(newStatus: ApplicationStatus): Boolean = {
+      !newStatus.equals(NONEXIST)
+    }
+  }
 
-  case object ACTIVE extends ApplicationStatus("active")
+  case object ACTIVE extends ApplicationStatus("active") {
+    override def canTransitTo(newStatus: ApplicationStatus): Boolean = {
+      !newStatus.equals(NONEXIST) && !newStatus.equals(ACTIVE)
+    }
+  }
 
   case object SUCCEEDED extends ApplicationTerminalStatus("succeeded")
 
@@ -160,5 +174,7 @@ object ApplicationStatus {
 
   case object TERMINATED extends ApplicationTerminalStatus("terminated")
 
-  case object NONEXIST extends ApplicationStatus("nonexist")
+  case object NONEXIST extends ApplicationStatus("nonexist") {
+    override def canTransitTo(newStatus: ApplicationStatus): Boolean = false
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 049d11d..e41a2c5 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -231,35 +231,40 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
       timeStamp: TimeStamp, error: Throwable): Unit = {
     applicationRegistry.get(appId) match {
       case Some(appRuntimeInfo) =>
-        var updatedStatus: ApplicationRuntimeInfo = null
-        LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp")
-        newStatus match {
-          case ApplicationStatus.ACTIVE =>
-            updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
-            sender ! AppMasterActivated(appId)
-          case succeeded@ApplicationStatus.SUCCEEDED =>
-            killAppMaster(appId, appRuntimeInfo.worker)
-            updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
-            appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
-              client ! ApplicationSucceeded(appId)
-            }
-          case failed@ApplicationStatus.FAILED =>
-            killAppMaster(appId, appRuntimeInfo.worker)
-            updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
-            appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
-              client ! ApplicationFailed(appId, error)
-            }
-          case terminated@ApplicationStatus.TERMINATED =>
-            updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
-          case status =>
-            LOG.error(s"App $appId should not change it's status to $status")
-        }
+        if (appRuntimeInfo.status.canTransitTo(newStatus)) {
+          var updatedStatus: ApplicationRuntimeInfo = null
+          LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp")
+          newStatus match {
+            case ApplicationStatus.ACTIVE =>
+              updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
+              sender ! AppMasterActivated(appId)
+            case succeeded@ApplicationStatus.SUCCEEDED =>
+              killAppMaster(appId, appRuntimeInfo.worker)
+              updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
+              appResultListeners.getOrElse(appId, List.empty).foreach { client =>
+                client ! ApplicationSucceeded(appId)
+              }
+            case failed@ApplicationStatus.FAILED =>
+              killAppMaster(appId, appRuntimeInfo.worker)
+              updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
+              appResultListeners.getOrElse(appId, List.empty).foreach { client =>
+                client ! ApplicationFailed(appId, error)
+              }
+            case terminated@ApplicationStatus.TERMINATED =>
+              updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
+            case status =>
+              LOG.error(s"App $appId should not change it's status to $status")
+          }
 
-        if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
-          kvService ! DeleteKVGroup(appId.toString)
+          if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
+            kvService ! DeleteKVGroup(appId.toString)
+          }
+          applicationRegistry += appId -> updatedStatus
+          kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
+        } else {
+          LOG.error(s"Application $appId tries to switch status ${appRuntimeInfo.status} " +
+            s"to $newStatus")
         }
-        applicationRegistry += appId -> updatedStatus
-        kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
       case None =>
         LOG.error(s"Can not find application runtime info for appId $appId when it's " +
           s"status changed to ${newStatus.toString}")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
new file mode 100644
index 0000000..743fe34
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+class ApplicationStatusSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+  "ApplicationStatus" should "check status transition properly" in {
+    val pending = ApplicationStatus.PENDING
+    assert(!pending.canTransitTo(ApplicationStatus.NONEXIST))
+    assert(pending.canTransitTo(ApplicationStatus.PENDING))
+    assert(pending.canTransitTo(ApplicationStatus.ACTIVE))
+    assert(pending.canTransitTo(ApplicationStatus.SUCCEEDED))
+
+    val active = ApplicationStatus.ACTIVE
+    assert(active.canTransitTo(ApplicationStatus.SUCCEEDED))
+    assert(active.canTransitTo(ApplicationStatus.PENDING))
+    assert(!active.canTransitTo(ApplicationStatus.ACTIVE))
+    assert(!active.canTransitTo(ApplicationStatus.NONEXIST))
+
+    val succeed = ApplicationStatus.SUCCEEDED
+    assert(!succeed.canTransitTo(ApplicationStatus.NONEXIST))
+    assert(!succeed.canTransitTo(ApplicationStatus.SUCCEEDED))
+    assert(!succeed.canTransitTo(ApplicationStatus.FAILED))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
new file mode 100644
index 0000000..664fc9c
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import org.apache.gearpump.cluster.AppDescription
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import org.apache.gearpump.cluster.appmaster.ApplicationMetaData
+
+class ApplicationMetaDataSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+  "ApplicationMetaData" should "check equal with respect to only appId and attemptId" in {
+    val appDescription = AppDescription("app", "AppMaster", null)
+    val metaDataA = ApplicationMetaData(0, 0, appDescription, null, null)
+    val metaDataB = ApplicationMetaData(0, 0, appDescription, null, null)
+    val metaDataC = ApplicationMetaData(0, 1, appDescription, null, null)
+
+    assert(metaDataA == metaDataB)
+    assert(metaDataA.hashCode == metaDataB.hashCode)
+    assert(metaDataA != metaDataC)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
deleted file mode 100644
index 6593836..0000000
--- a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import org.apache.gearpump.cluster.AppDescription
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-import org.apache.gearpump.cluster.appmaster.ApplicationMetaData
-
-class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
-  "ApplicationState" should "check equal with respect to only appId and attemptId" in {
-    val appDescription = AppDescription("app", "AppMaster", null)
-    val stateA = ApplicationMetaData(0, 0, appDescription, null, null)
-    val stateB = ApplicationMetaData(0, 0, appDescription, null, null)
-    val stateC = ApplicationMetaData(0, 1, appDescription, null, null)
-
-    assert(stateA == stateB)
-    assert(stateA.hashCode == stateB.hashCode)
-    assert(stateA != stateC)
-  }
-}