You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2019/07/23 16:22:20 UTC
[incubator-openwhisk] branch master updated: Disable
MesosContainerFactory from subscribing after close (#4541)
This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 5d02280 Disable MesosContainerFactory from subscribing after close (#4541)
5d02280 is described below
commit 5d022809ac1a84945ab16e0b5f70252b8786fd54
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue Jul 23 21:52:09 2019 +0530
Disable MesosContainerFactory from subscribing after close (#4541)
---
.../core/mesos/MesosContainerFactory.scala | 9 ++-
.../mesos/test/MesosContainerFactoryTest.scala | 84 ++++++++++++----------
2 files changed, 56 insertions(+), 37 deletions(-)
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala
index 5aeb576..11573bb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala
@@ -103,6 +103,9 @@ class MesosContainerFactory(config: WhiskConfig,
/** Inits Mesos framework. */
val mesosClientActor = clientFactory(as, mesosConfig)
+ @volatile
+ private var closed: Boolean = false
+
subscribe()
/** Subscribes Mesos actor to mesos event stream; retry on timeout (which should be unusual). */
@@ -115,7 +118,7 @@ class MesosContainerFactory(config: WhiskConfig,
.recoverWith {
case e =>
logging.error(this, s"subscribe failed... $e}")
- subscribe()
+ if (closed) Future.successful(()) else subscribe()
}
}
@@ -188,6 +191,10 @@ class MesosContainerFactory(config: WhiskConfig,
logging.error(this, s"Mesos framework teardown failed : $t}")
}
}
+
+ def close(): Unit = {
+ closed = true
+ }
}
object MesosContainerFactory {
private def createClient(actorSystem: ActorSystem, mesosConfig: MesosConfig): ActorRef =
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index ef0121e..d980fb7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -57,6 +57,9 @@ import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.mesos.MesosConfig
import org.apache.openwhisk.core.mesos.MesosContainerFactory
import org.apache.openwhisk.core.mesos.MesosTimeoutConfig
+import org.apache.openwhisk.utils.retry
+
+import scala.collection.JavaConverters._
@RunWith(classOf[JUnitRunner])
class MesosContainerFactoryTest
@@ -93,12 +96,24 @@ class MesosContainerFactoryTest
Seq.empty,
Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4")))
+ private var factory: MesosContainerFactory = _
override def beforeEach() = {
stream.reset()
}
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ Option(factory).foreach(_.close())
+ }
+
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system, verifySystemShutdown = true)
+ retry({
+ val threadNames = Thread.getAllStackTraces.asScala.keySet.map(_.getName)
+ withClue(s"Threads related to MesosActorSystem found to be active $threadNames") {
+ assert(!threadNames.exists(_.startsWith("MesosActorSystem")))
+ }
+ }, 10, Some(1.second))
super.afterAll()
}
@@ -111,7 +126,7 @@ class MesosContainerFactoryTest
it should "send Subscribe on init" in {
val wskConfig = new WhiskConfig(Map.empty)
- new MesosContainerFactory(
+ factory = new MesosContainerFactory(
wskConfig,
system,
logging,
@@ -138,16 +153,15 @@ class MesosContainerFactoryTest
2,
timeouts)
- val factory =
- new MesosContainerFactory(
- wskConfig,
- system,
- logging,
- Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
- containerArgsConfig,
- mesosConfig = mesosConfig,
- clientFactory = (_, _) => testActor,
- taskIdGenerator = testTaskId _)
+ factory = new MesosContainerFactory(
+ wskConfig,
+ system,
+ logging,
+ Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+ containerArgsConfig,
+ mesosConfig = mesosConfig,
+ clientFactory = (_, _) => testActor,
+ taskIdGenerator = testTaskId _)
expectMsg(Subscribe)
factory.createContainer(
@@ -182,16 +196,15 @@ class MesosContainerFactoryTest
it should "send DeleteTask on destroy" in {
val probe = TestProbe()
- val factory =
- new MesosContainerFactory(
- wskConfig,
- system,
- logging,
- Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
- containerArgsConfig,
- mesosConfig = mesosConfig,
- clientFactory = (system, mesosConfig) => probe.testActor,
- taskIdGenerator = testTaskId _)
+ factory = new MesosContainerFactory(
+ wskConfig,
+ system,
+ logging,
+ Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+ containerArgsConfig,
+ mesosConfig = mesosConfig,
+ clientFactory = (system, mesosConfig) => probe.testActor,
+ taskIdGenerator = testTaskId _)
probe.expectMsg(Subscribe)
//emulate successful subscribe
@@ -251,21 +264,20 @@ class MesosContainerFactoryTest
it should "return static message for logs" in {
val probe = TestProbe()
- val factory =
- new MesosContainerFactory(
- wskConfig,
- system,
- logging,
- Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
- new ContainerArgsConfig(
- "bridge",
- Seq.empty,
- Seq.empty,
- Seq.empty,
- Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
- mesosConfig = mesosConfig,
- clientFactory = (system, mesosConfig) => probe.testActor,
- taskIdGenerator = testTaskId _)
+ factory = new MesosContainerFactory(
+ wskConfig,
+ system,
+ logging,
+ Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+ new ContainerArgsConfig(
+ "bridge",
+ Seq.empty,
+ Seq.empty,
+ Seq.empty,
+ Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
+ mesosConfig = mesosConfig,
+ clientFactory = (system, mesosConfig) => probe.testActor,
+ taskIdGenerator = testTaskId _)
probe.expectMsg(Subscribe)
//emulate successful subscribe