You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/10 07:58:58 UTC
incubator-gearpump git commit: [GEARPUMP-285] Fix false alarm of
shutting down executor time out
Repository: incubator-gearpump
Updated Branches:
refs/heads/master aebc09a8d -> ae55efbdd
[GEARPUMP-285] Fix false alarm of shutting down executor time out
Author: huafengw <fv...@gmail.com>
Closes #169 from huafengw/timeout.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ae55efbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ae55efbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ae55efbd
Branch: refs/heads/master
Commit: ae55efbddaf176b87d20f26bc57fa17ff1cbe2a5
Parents: aebc09a
Author: huafengw <fv...@gmail.com>
Authored: Fri Mar 10 15:58:28 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Mar 10 15:58:36 2017 +0800
----------------------------------------------------------------------
.../gearpump/cluster/AppDescription.scala | 24 ++++++--
.../gearpump/cluster/master/AppManager.scala | 59 +++++++++++---------
.../cluster/ApplicationStatusSpec.scala | 42 ++++++++++++++
.../master/ApplicationMetaDataSpec.scala | 37 ++++++++++++
.../cluster/master/ApplicationStateSpec.scala | 37 ------------
5 files changed, 131 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
index c31f01f..0c46aca 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
@@ -144,15 +144,29 @@ case class ExecutorJVMConfig(
sealed abstract class ApplicationStatus(val status: String)
extends Serializable{
override def toString: String = status
+
+ def canTransitTo(newStatus: ApplicationStatus): Boolean
+
}
sealed abstract class ApplicationTerminalStatus(override val status: String)
- extends ApplicationStatus(status)
+ extends ApplicationStatus(status) {
+
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = false
+}
object ApplicationStatus {
- case object PENDING extends ApplicationStatus("pending")
+ case object PENDING extends ApplicationStatus("pending") {
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = {
+ !newStatus.equals(NONEXIST)
+ }
+ }
- case object ACTIVE extends ApplicationStatus("active")
+ case object ACTIVE extends ApplicationStatus("active") {
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = {
+ !newStatus.equals(NONEXIST) && !newStatus.equals(ACTIVE)
+ }
+ }
case object SUCCEEDED extends ApplicationTerminalStatus("succeeded")
@@ -160,5 +174,7 @@ object ApplicationStatus {
case object TERMINATED extends ApplicationTerminalStatus("terminated")
- case object NONEXIST extends ApplicationStatus("nonexist")
+ case object NONEXIST extends ApplicationStatus("nonexist") {
+ override def canTransitTo(newStatus: ApplicationStatus): Boolean = false
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 049d11d..e41a2c5 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -231,35 +231,40 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
timeStamp: TimeStamp, error: Throwable): Unit = {
applicationRegistry.get(appId) match {
case Some(appRuntimeInfo) =>
- var updatedStatus: ApplicationRuntimeInfo = null
- LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp")
- newStatus match {
- case ApplicationStatus.ACTIVE =>
- updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
- sender ! AppMasterActivated(appId)
- case succeeded@ApplicationStatus.SUCCEEDED =>
- killAppMaster(appId, appRuntimeInfo.worker)
- updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
- appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
- client ! ApplicationSucceeded(appId)
- }
- case failed@ApplicationStatus.FAILED =>
- killAppMaster(appId, appRuntimeInfo.worker)
- updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
- appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
- client ! ApplicationFailed(appId, error)
- }
- case terminated@ApplicationStatus.TERMINATED =>
- updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
- case status =>
- LOG.error(s"App $appId should not change it's status to $status")
- }
+ if (appRuntimeInfo.status.canTransitTo(newStatus)) {
+ var updatedStatus: ApplicationRuntimeInfo = null
+ LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp")
+ newStatus match {
+ case ApplicationStatus.ACTIVE =>
+ updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
+ sender ! AppMasterActivated(appId)
+ case succeeded@ApplicationStatus.SUCCEEDED =>
+ killAppMaster(appId, appRuntimeInfo.worker)
+ updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
+ appResultListeners.getOrElse(appId, List.empty).foreach { client =>
+ client ! ApplicationSucceeded(appId)
+ }
+ case failed@ApplicationStatus.FAILED =>
+ killAppMaster(appId, appRuntimeInfo.worker)
+ updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
+ appResultListeners.getOrElse(appId, List.empty).foreach { client =>
+ client ! ApplicationFailed(appId, error)
+ }
+ case terminated@ApplicationStatus.TERMINATED =>
+ updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
+ case status =>
+ LOG.error(s"App $appId should not change it's status to $status")
+ }
- if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
- kvService ! DeleteKVGroup(appId.toString)
+ if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
+ kvService ! DeleteKVGroup(appId.toString)
+ }
+ applicationRegistry += appId -> updatedStatus
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
+ } else {
+ LOG.error(s"Application $appId tries to switch status ${appRuntimeInfo.status} " +
+ s"to $newStatus")
}
- applicationRegistry += appId -> updatedStatus
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry))
case None =>
LOG.error(s"Can not find application runtime info for appId $appId when it's " +
s"status changed to ${newStatus.toString}")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
new file mode 100644
index 0000000..743fe34
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+class ApplicationStatusSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+ "ApplicationStatus" should "check status transition properly" in {
+ val pending = ApplicationStatus.PENDING
+ assert(!pending.canTransitTo(ApplicationStatus.NONEXIST))
+ assert(pending.canTransitTo(ApplicationStatus.PENDING))
+ assert(pending.canTransitTo(ApplicationStatus.ACTIVE))
+ assert(pending.canTransitTo(ApplicationStatus.SUCCEEDED))
+
+ val active = ApplicationStatus.ACTIVE
+ assert(active.canTransitTo(ApplicationStatus.SUCCEEDED))
+ assert(active.canTransitTo(ApplicationStatus.PENDING))
+ assert(!active.canTransitTo(ApplicationStatus.ACTIVE))
+ assert(!active.canTransitTo(ApplicationStatus.NONEXIST))
+
+ val succeed = ApplicationStatus.SUCCEEDED
+ assert(!succeed.canTransitTo(ApplicationStatus.NONEXIST))
+ assert(!succeed.canTransitTo(ApplicationStatus.SUCCEEDED))
+ assert(!succeed.canTransitTo(ApplicationStatus.FAILED))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
new file mode 100644
index 0000000..664fc9c
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import org.apache.gearpump.cluster.AppDescription
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import org.apache.gearpump.cluster.appmaster.ApplicationMetaData
+
+class ApplicationMetaDataSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+ "ApplicationMetaData" should "check equal with respect to only appId and attemptId" in {
+ val appDescription = AppDescription("app", "AppMaster", null)
+ val metaDataA = ApplicationMetaData(0, 0, appDescription, null, null)
+ val metaDataB = ApplicationMetaData(0, 0, appDescription, null, null)
+ val metaDataC = ApplicationMetaData(0, 1, appDescription, null, null)
+
+ assert(metaDataA == metaDataB)
+ assert(metaDataA.hashCode == metaDataB.hashCode)
+ assert(metaDataA != metaDataC)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
deleted file mode 100644
index 6593836..0000000
--- a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import org.apache.gearpump.cluster.AppDescription
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-import org.apache.gearpump.cluster.appmaster.ApplicationMetaData
-
-class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- "ApplicationState" should "check equal with respect to only appId and attemptId" in {
- val appDescription = AppDescription("app", "AppMaster", null)
- val stateA = ApplicationMetaData(0, 0, appDescription, null, null)
- val stateB = ApplicationMetaData(0, 0, appDescription, null, null)
- val stateC = ApplicationMetaData(0, 1, appDescription, null, null)
-
- assert(stateA == stateB)
- assert(stateA.hashCode == stateB.hashCode)
- assert(stateA != stateC)
- }
-}