You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ty...@apache.org on 2019/07/23 16:22:20 UTC

[incubator-openwhisk] branch master updated: Disable MesosContainerFactory from subscribing after close (#4541)

This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d02280  Disable MesosContainerFactory from subscribing after close (#4541)
5d02280 is described below

commit 5d022809ac1a84945ab16e0b5f70252b8786fd54
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Tue Jul 23 21:52:09 2019 +0530

    Disable MesosContainerFactory from subscribing after close (#4541)
---
 .../core/mesos/MesosContainerFactory.scala         |  9 ++-
 .../mesos/test/MesosContainerFactoryTest.scala     | 84 ++++++++++++----------
 2 files changed, 56 insertions(+), 37 deletions(-)

diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala
index 5aeb576..11573bb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosContainerFactory.scala
@@ -103,6 +103,9 @@ class MesosContainerFactory(config: WhiskConfig,
   /** Inits Mesos framework. */
   val mesosClientActor = clientFactory(as, mesosConfig)
 
+  @volatile
+  private var closed: Boolean = false
+
   subscribe()
 
   /** Subscribes Mesos actor to mesos event stream; retry on timeout (which should be unusual). */
@@ -115,7 +118,7 @@ class MesosContainerFactory(config: WhiskConfig,
       .recoverWith {
         case e =>
           logging.error(this, s"subscribe failed... $e}")
-          subscribe()
+          if (closed) Future.successful(()) else subscribe()
       }
   }
 
@@ -188,6 +191,10 @@ class MesosContainerFactory(config: WhiskConfig,
           logging.error(this, s"Mesos framework teardown failed : $t}")
       }
   }
+
+  def close(): Unit = {
+    closed = true
+  }
 }
 object MesosContainerFactory {
   private def createClient(actorSystem: ActorSystem, mesosConfig: MesosConfig): ActorRef =
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index ef0121e..d980fb7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -57,6 +57,9 @@ import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.mesos.MesosConfig
 import org.apache.openwhisk.core.mesos.MesosContainerFactory
 import org.apache.openwhisk.core.mesos.MesosTimeoutConfig
+import org.apache.openwhisk.utils.retry
+
+import scala.collection.JavaConverters._
 
 @RunWith(classOf[JUnitRunner])
 class MesosContainerFactoryTest
@@ -93,12 +96,24 @@ class MesosContainerFactoryTest
       Seq.empty,
       Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4")))
 
+  private var factory: MesosContainerFactory = _
   override def beforeEach() = {
     stream.reset()
   }
 
+  override protected def afterEach(): Unit = {
+    super.afterEach()
+    Option(factory).foreach(_.close())
+  }
+
   override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system, verifySystemShutdown = true)
+    retry({
+      val threadNames = Thread.getAllStackTraces.asScala.keySet.map(_.getName)
+      withClue(s"Threads related to  MesosActorSystem found to be active $threadNames") {
+        assert(!threadNames.exists(_.startsWith("MesosActorSystem")))
+      }
+    }, 10, Some(1.second))
     super.afterAll()
   }
 
@@ -111,7 +126,7 @@ class MesosContainerFactoryTest
 
   it should "send Subscribe on init" in {
     val wskConfig = new WhiskConfig(Map.empty)
-    new MesosContainerFactory(
+    factory = new MesosContainerFactory(
       wskConfig,
       system,
       logging,
@@ -138,16 +153,15 @@ class MesosContainerFactoryTest
       2,
       timeouts)
 
-    val factory =
-      new MesosContainerFactory(
-        wskConfig,
-        system,
-        logging,
-        Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
-        containerArgsConfig,
-        mesosConfig = mesosConfig,
-        clientFactory = (_, _) => testActor,
-        taskIdGenerator = testTaskId _)
+    factory = new MesosContainerFactory(
+      wskConfig,
+      system,
+      logging,
+      Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+      containerArgsConfig,
+      mesosConfig = mesosConfig,
+      clientFactory = (_, _) => testActor,
+      taskIdGenerator = testTaskId _)
 
     expectMsg(Subscribe)
     factory.createContainer(
@@ -182,16 +196,15 @@ class MesosContainerFactoryTest
 
   it should "send DeleteTask on destroy" in {
     val probe = TestProbe()
-    val factory =
-      new MesosContainerFactory(
-        wskConfig,
-        system,
-        logging,
-        Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
-        containerArgsConfig,
-        mesosConfig = mesosConfig,
-        clientFactory = (system, mesosConfig) => probe.testActor,
-        taskIdGenerator = testTaskId _)
+    factory = new MesosContainerFactory(
+      wskConfig,
+      system,
+      logging,
+      Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+      containerArgsConfig,
+      mesosConfig = mesosConfig,
+      clientFactory = (system, mesosConfig) => probe.testActor,
+      taskIdGenerator = testTaskId _)
 
     probe.expectMsg(Subscribe)
     //emulate successful subscribe
@@ -251,21 +264,20 @@ class MesosContainerFactoryTest
 
   it should "return static message for logs" in {
     val probe = TestProbe()
-    val factory =
-      new MesosContainerFactory(
-        wskConfig,
-        system,
-        logging,
-        Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
-        new ContainerArgsConfig(
-          "bridge",
-          Seq.empty,
-          Seq.empty,
-          Seq.empty,
-          Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
-        mesosConfig = mesosConfig,
-        clientFactory = (system, mesosConfig) => probe.testActor,
-        taskIdGenerator = testTaskId _)
+    factory = new MesosContainerFactory(
+      wskConfig,
+      system,
+      logging,
+      Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
+      new ContainerArgsConfig(
+        "bridge",
+        Seq.empty,
+        Seq.empty,
+        Seq.empty,
+        Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
+      mesosConfig = mesosConfig,
+      clientFactory = (system, mesosConfig) => probe.testActor,
+      taskIdGenerator = testTaskId _)
 
     probe.expectMsg(Subscribe)
     //emulate successful subscribe