You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ji...@apache.org on 2022/05/26 05:14:13 UTC

[openwhisk] branch master updated: Add some testcases and missing ASF headers for new scheduler (#5243)

This is an automated email from the ASF dual-hosted git repository.

jiangpengcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c90e8ccbc Add some testcases and missing ASF headers for new scheduler  (#5243)
c90e8ccbc is described below

commit c90e8ccbc6e49bebc4bf9d641bd8aee0085cc805
Author: jiangpengcheng <ji...@navercorp.com>
AuthorDate: Thu May 26 13:14:05 2022 +0800

    Add some testcases and missing ASF headers for new scheduler  (#5243)
    
    * Add some testcases and missing ASF headers for new scheduler
    
    * Reset stream after each test
    
    * Wait more time
    
    * Update msg start time to ensure it's not stale
    
    * Make MemoryQueueFlowTests stable
    
    * Ignore warmUp msg immediately
    
    * Use retry to replace sleep
    
    * Add missing config
    
    * Fix configuration error
---
 core/controller/src/main/resources/reference.conf  |   2 +-
 .../core/loadBalancer/FPCPoolBalancer.scala        |  17 ++
 .../scheduler/queue/SchedulingDecisionMaker.scala  |  17 ++
 .../core/controller/test/FPCEntitlementTests.scala |  61 ++++++
 .../loadBalancer/test/FPCPoolBalancerTests.scala   |  17 ++
 .../container/test/ContainerManagerTests.scala     | 198 ++++++++++++++++--
 .../grpc/test/ActivationServiceImplTests.scala     |  55 ++++-
 .../queue/test/ContainerCounterTests.scala         |  43 +++-
 .../queue/test/MemoryQueueFlowTests.scala          |  29 ++-
 .../scheduler/queue/test/MemoryQueueTests.scala    |  17 ++
 .../queue/test/MemoryQueueTestsFixture.scala       |  17 ++
 .../scheduler/queue/test/QueueManagerTests.scala   | 230 +++++++++++++++++++--
 .../queue/test/SchedulingDecisionMakerTests.scala  |  21 +-
 13 files changed, 676 insertions(+), 48 deletions(-)

diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf
index dfde945a0..1eb4eb580 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -31,7 +31,7 @@ whisk {
     timeout-addon = 1m
 
     fpc {
-      use-perMin-throttles = false
+      use-per-min-throttles = false
     }
   }
   controller {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index bd72c8cb6..576d3b3b1 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.openwhisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index f36f1f82f..0bbc1d98b 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.openwhisk.core.scheduler.queue
 
 import akka.actor.{Actor, ActorSystem, Props}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala
new file mode 100644
index 000000000..06724e5f0
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.controller.test
+
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.controller.RejectRequest
+import org.apache.openwhisk.core.entitlement.{EntitlementProvider, FPCEntitlementProvider, Privilege, Resource}
+import org.apache.openwhisk.core.entitlement.Privilege.{ACTIVATE, DELETE, PUT, READ, REJECT}
+import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.loadBalancer.LoadBalancer
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class FPCEntitlementProviderTests extends ControllerTestCommon with ScalaFutures with MockFactory {
+
+  implicit val transactionId = TransactionId.testing
+
+  it should "get throttle flag from loadBalancer" in {
+    val someUser = WhiskAuthHelpers.newIdentity()
+    val action = FullyQualifiedEntityName(EntityPath("testns"), EntityName("action"))
+    val loadBalancer = mock[LoadBalancer]
+    (loadBalancer
+      .checkThrottle(_: EntityPath, _: String))
+      .expects(someUser.namespace.name.toPath, action.fullPath.asString)
+      .returning(true)
+    val resources = Set(Resource(action.path, ACTIONS, Some(action.name.name)))
+
+    val entitlementProvider: EntitlementProvider = new FPCEntitlementProvider(whiskConfig, loadBalancer, instance)
+    entitlementProvider.checkThrottles(someUser, ACTIVATE, resources).failed.futureValue shouldBe a[RejectRequest]
+
+    Seq[Privilege](READ, PUT, DELETE, REJECT).foreach(OP => {
+      noException shouldBe thrownBy(entitlementProvider.checkThrottles(someUser, OP, resources).futureValue)
+    })
+
+    val action2 = FullyQualifiedEntityName(EntityPath("testns2"), EntityName("action2"))
+    val resources2 = Set(Resource(action2.path, ACTIONS, Some(action2.name.name)))
+    (loadBalancer
+      .checkThrottle(_: EntityPath, _: String))
+      .expects(someUser.namespace.name.toPath, action2.fullPath.asString)
+      .returning(false)
+    noException shouldBe thrownBy(entitlementProvider.checkThrottles(someUser, ACTIVATE, resources2).futureValue)
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala
index 98342fc22..81aa708b4 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.openwhisk.core.loadBalancer.test
 
 import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index 6c17346da..a00b632c8 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -49,7 +49,7 @@ import org.apache.openwhisk.core.scheduler.message.{
   SuccessfulCreationJob
 }
 import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
-import org.apache.openwhisk.core.service.WatchEndpointInserted
+import org.apache.openwhisk.core.service.{WatchEndpointInserted, WatchEndpointRemoved}
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
@@ -77,6 +77,7 @@ class ContainerManagerTests
     with StreamLogging {
 
   val config = new WhiskConfig(ExecManifest.requiredProperties)
+  ExecManifest.initialize(config)
 
   val testInvocationNamespace = "test-invocation-namespace"
   val testNamespace = "test-namespace"
@@ -278,11 +279,16 @@ class ContainerManagerTests
     )
     expectGetInvokers(mockEtcd, invokers)
     expectGetInvokers(mockEtcd, invokers)
-    expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` twice, and another one for warmup
+    expectGetInvokers(mockEtcd, invokers)
+    expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` for 3 times, and another one for warmup
 
     val mockJobManager = TestProbe()
     val mockWatcher = TestProbe()
     val receiver = TestProbe()
+    // ignore warmUp message
+    receiver.ignoreMsg {
+      case s: String => s.contains("warmUp")
+    }
 
     val manager =
       system.actorOf(ContainerManager
@@ -345,11 +351,6 @@ class ContainerManagerTests
     // it should reuse 2 warmed containers
     manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace)
 
-    // ignore warmUp message
-    receiver.ignoreMsg {
-      case s: String => s.contains("warmUp")
-    }
-
     // msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker
     receiver.expectMsg(s"invoker0-$msg1")
     receiver.expectMsg(s"invoker1-$msg2")
@@ -371,10 +372,30 @@ class ContainerManagerTests
     manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace)
     receiver.expectMsg(s"invoker1-$msg2")
 
-    // warmed container for action1 become warmed
-    manager ! SuccessfulCreationJob(msg1.creationId, msg1.invocationNamespace, msg1.action, msg1.revision)
+    // warmed container for action1 become warmed when received FailedCreationJob
+    manager ! FailedCreationJob(
+      msg1.creationId,
+      msg1.invocationNamespace,
+      msg1.action,
+      msg1.revision,
+      NoAvailableResourceInvokersError,
+      "")
     manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
     receiver.expectMsg(s"invoker0-$msg1")
+
+    // warmed container for action1 become unwarmed
+    manager ! WatchEndpointRemoved(
+      ContainerKeys.warmedPrefix,
+      ContainerKeys.warmedContainers(
+        testInvocationNamespace,
+        testfqn,
+        testRevision,
+        InvokerInstanceId(0, userMemory = 0.bytes),
+        ContainerId("fake")),
+      "",
+      true)
+    manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
+    receiver.expectMsg(s"invoker2-$msg1")
   }
 
   it should "not try warmed containers if revision is unmatched" in {
@@ -392,6 +413,10 @@ class ContainerManagerTests
     val mockJobManager = TestProbe()
     val mockWatcher = TestProbe()
     val receiver = TestProbe()
+    // ignore warmUp message
+    receiver.ignoreMsg {
+      case s: String => s.contains("warmUp")
+    }
 
     val manager =
       system.actorOf(ContainerManager
@@ -423,11 +448,6 @@ class ContainerManagerTests
     // it should not reuse the warmed container
     manager ! ContainerCreation(List(msg), 128.MB, testInvocationNamespace)
 
-    // ignore warmUp message
-    receiver.ignoreMsg {
-      case s: String => s.contains("warmUp")
-    }
-
     // it should be scheduled to the sole health invoker: invoker2
     receiver.expectMsg(s"invoker2-$msg")
 
@@ -686,6 +706,52 @@ class ContainerManagerTests
         "No available invokers with resources List(fake)."))
   }
 
+  it should "choose tagged invokers when no invokers available which has no tags first" in {
+    val msg =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns1")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+    val msg2 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns2")),
+        testRevision,
+        actionMetadata,
+        testsid,
+        schedulerHost,
+        rpcPort)
+
+    val probe = TestProbe()
+    QueuePool.put(
+      MemoryQueueKey(testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)),
+      MemoryQueueValue(probe.ref, true))
+
+    val healthyInvokers: List[InvokerHealth] =
+      List(InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq("cpu", "memory")), Healthy))
+
+    // there is no available invokers which has no tags, it should choose tagged invokers for msg
+    // and for msg2, it should return no available invokers
+    val pairs =
+      ContainerManager.schedule(healthyInvokers, List(msg, msg2), msg.whiskActionMetaData.limits.memory.megabytes.MB)
+    pairs should contain theSameElementsAs List(ScheduledPair(msg, healthyInvokers(0).id))
+
+    probe.expectMsg(
+      FailedCreationJob(
+        msg2.creationId,
+        testInvocationNamespace,
+        msg2.action,
+        testRevision,
+        NoAvailableInvokersError,
+        "No available invokers."))
+  }
+
   it should "respect the resource policy while use resource filter" in {
     val msg1 =
       ContainerCreationMessage(
@@ -730,6 +796,21 @@ class ContainerManagerTests
         testsid,
         schedulerHost,
         rpcPort)
+    val msg4 =
+      ContainerCreationMessage(
+        TransactionId.testing,
+        testInvocationNamespace,
+        testfqn.resolve(EntityName("ns3")),
+        testRevision,
+        actionMetadata.copy(
+          limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+          annotations =
+            Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+              Annotations.InvokerResourcesStrictPolicyAnnotationName,
+              JsBoolean(false))),
+        testsid,
+        schedulerHost,
+        rpcPort)
 
     val probe = TestProbe()
     QueuePool.put(
@@ -738,7 +819,7 @@ class ContainerManagerTests
     val healthyInvokers: List[InvokerHealth] =
       List(
         InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
-        InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB, tags = Seq("cpu", "memory")), Healthy))
+        InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq("cpu", "memory")), Healthy))
 
     // while resourcesStrictPolicy is true, and there is no suitable invokers, return an error
     val pairs =
@@ -760,12 +841,20 @@ class ContainerManagerTests
     pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id))
 
     // while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
-    // if there is none, then choose other invokers, here is the invoker1
+    // if there is none, then choose invokers with other tags, if there is still none, return no available invokers
     val pairs3 = ContainerManager.schedule(
       healthyInvokers.takeRight(1),
-      List(msg3),
+      List(msg3, msg4),
       msg3.whiskActionMetaData.limits.memory.megabytes.MB)
     pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id))
+    probe.expectMsg(
+      FailedCreationJob(
+        msg4.creationId,
+        testInvocationNamespace,
+        msg4.action,
+        testRevision,
+        NoAvailableInvokersError,
+        "No available invokers."))
   }
 
   it should "send FailedCreationJob to queue manager when no invokers are available" in {
@@ -991,6 +1080,81 @@ class ContainerManagerTests
 
     result.take(m) shouldBe result.takeRight(b)
   }
+
+  behavior of "warm up"
+
+  it should "warm up all invokers when start" in {
+    val mockEtcd = mock[EtcdClient]
+
+    val invokers: List[InvokerHealth] = List(
+      InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+      InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+    )
+    expectGetInvokers(mockEtcd, invokers)
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+    val receiver = TestProbe()
+
+    val manager =
+      system.actorOf(ContainerManager
+        .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+    (0 to 2).foreach(i => {
+      receiver.expectMsgPF() {
+        case msg: String if msg.contains("warmUp") && msg.contains(s"invoker$i") => true
+        case msg                                                                 => false
+      }
+    })
+  }
+
+  it should "warm up new invoker when new one is registered" in {
+    val mockEtcd = mock[EtcdClient]
+    expectGetInvokers(mockEtcd, List.empty)
+
+    val mockJobManager = TestProbe()
+    val mockWatcher = TestProbe()
+    val receiver = TestProbe()
+
+    val manager =
+      system.actorOf(ContainerManager
+        .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+    manager ! WatchEndpointInserted(
+      InvokerKeys.prefix,
+      InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+      "",
+      true)
+    receiver.expectMsgPF() {
+      case msg: String if msg.contains("warmUp") && msg.contains(s"invoker0") => true
+      case _                                                                  => false
+    }
+
+    // shouldn't warmup again
+    manager ! WatchEndpointInserted(
+      InvokerKeys.prefix,
+      InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+      "",
+      true)
+    receiver.expectNoMessage()
+
+    // should warmup again since invoker0 is a new one
+    manager ! WatchEndpointRemoved(
+      InvokerKeys.prefix,
+      InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+      "",
+      true)
+    manager ! WatchEndpointInserted(
+      InvokerKeys.prefix,
+      InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+      "",
+      true)
+    receiver.expectMsgPF() {
+      case msg: String if msg.contains("warmUp") && msg.contains(s"invoker0") => true
+      case _                                                                  => false
+    }
+  }
 }
 
 @RunWith(classOf[JUnitRunner])
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
index 7aa43b031..b2042f256 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
@@ -33,7 +33,7 @@ import org.apache.openwhisk.core.scheduler.queue.{
   NoMemoryQueue,
   QueuePool
 }
-import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse}
+import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
@@ -64,7 +64,7 @@ class ActivationServiceImplTests
 
   private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
 
-  implicit val timeoutConfig = PatienceConfig(5.seconds)
+  implicit val timeoutConfig = PatienceConfig(10.seconds)
 
   behavior of "ActivationService"
 
@@ -156,6 +156,28 @@ class ActivationServiceImplTests
     expectNoMessage(200.millis)
   }
 
+  it should "return NoActivationMessage if queue doesn't return response" in {
+
+    val activationServiceImpl = ActivationServiceImpl()
+
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true))
+
+    val tid = TransactionId(TransactionId.generateTid())
+    activationServiceImpl
+      .fetchActivation(
+        FetchRequest(
+          tid.serialize,
+          message.user.namespace.name.asString,
+          testFQN.serialize,
+          testDocRevision.serialize,
+          testContainerId,
+          false,
+          alive = true))
+      .futureValue shouldBe FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+
+    expectMsg(GetActivation(tid, testFQN, testContainerId, false, None))
+  }
+
   it should "return NoActivationMessage if it is a warm-up action" in {
 
     val activationServiceImpl = ActivationServiceImpl()
@@ -219,4 +241,33 @@ class ActivationServiceImplTests
     }
   }
 
+  it should "reschedule msg if related queue exist" in {
+    QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true))
+    val activationServiceImpl = ActivationServiceImpl()
+
+    activationServiceImpl
+      .rescheduleActivation(
+        RescheduleRequest(
+          message.user.namespace.name.asString,
+          testFQN.serialize,
+          testDocRevision.serialize,
+          message.serialize))
+      .futureValue shouldBe RescheduleResponse(true)
+
+    expectMsg(message)
+  }
+
+  it should "not reschedule msg if queue doesn't exist" in {
+    val activationServiceImpl = ActivationServiceImpl()
+
+    activationServiceImpl
+      .rescheduleActivation(
+        RescheduleRequest(
+          message.user.namespace.name.asString,
+          testFQN.serialize,
+          testDocRevision.serialize,
+          message.serialize))
+      .futureValue shouldBe RescheduleResponse()
+  }
+
 }
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala
index e9e6694e0..7b6cbefbe 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala
@@ -42,6 +42,7 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
 import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
 import org.apache.openwhisk.core.scheduler.queue.NamespaceContainerCount
 import org.apache.openwhisk.core.service.{DeleteEvent, PutEvent, UnwatchEndpoint, WatchEndpoint, WatcherService}
+import org.apache.openwhisk.utils.retry
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.concurrent.ScalaFutures
@@ -49,7 +50,7 @@ import org.scalatest.{FlatSpecLike, Matchers}
 import org.scalatest.junit.JUnitRunner
 
 import scala.concurrent.Future
-import scala.concurrent.duration.TimeUnit
+import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
 class ContainerCounterTests
@@ -247,11 +248,42 @@ class ContainerCounterTests
     NamespaceContainerCount.instances.clear()
   }
 
-  class MockEtcdClient(client: Client, isLeader: Boolean, leaseNotFound: Boolean = false, failedCount: Int = 1)
+  it should "update the number of containers correctly when multiple entries are inserted into etcd" in {
+    val mockEtcdClient = new MockEtcdClient(client, true, failedCount = 1)
+    val watcher = system.actorOf(WatcherService.props(mockEtcdClient))
+
+    val ns = NamespaceContainerCount(namespace, mockEtcdClient, watcher)
+    retry(() => {
+      ns.inProgressContainerNumByNamespace shouldBe 0
+      ns.existingContainerNumByNamespace shouldBe 0
+    }, 10, Some(100.milliseconds))
+
+    val invoker = "invoker0"
+    (0 to 100).foreach(i => {
+      mockEtcdClient.publishEvents(
+        EventType.PUT,
+        inProgressContainer(namespace, fqn, revision, schedulerId, CreationId(s"testId$i")),
+        "test-value")
+    })
+    (0 to 100).foreach(i => {
+      mockEtcdClient.publishEvents(
+        EventType.PUT,
+        s"${ContainerKeys.existingContainers(namespace, fqn, DocRevision.empty)}/${invoker}/test-container$i",
+        "test-value")
+    })
+
+    retry(() => {
+      ns.inProgressContainerNumByNamespace shouldBe 101
+      ns.existingContainerNumByNamespace shouldBe 101
+    }, 50, Some(100.milliseconds))
+  }
+
+  class MockEtcdClient(client: Client, isLeader: Boolean, leaseNotFound: Boolean = false, failedCount: Int = 0)
       extends EtcdClient(client)(ec) {
     var count = 0
     var storedValues = List.empty[(String, String, Long, Long)]
     var dataMap = Map[String, String]()
+    var totalFailedCount = 0
 
     override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: Long): Future[TxnResponse] = {
       if (isLeader) {
@@ -264,7 +296,12 @@ class ContainerCounterTests
      * this method count the number of entries whose key starts with the given prefix
      */
     override def getCount(prefixKey: String): Future[Long] = {
-      Future.successful { dataMap.count(data => data._1.startsWith(prefixKey)) }
+      if (totalFailedCount < failedCount) {
+        totalFailedCount += 1
+        Future.failed(new Exception("error"))
+      } else {
+        Future.successful { dataMap.count(data => data._1.startsWith(prefixKey)) }
+      }
     }
 
     var watchCallbackMap = Map[String, WatchUpdate => Unit]()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 735506d5b..1d523c20d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.openwhisk.core.scheduler.queue.test
 
 import akka.actor.ActorRef
@@ -10,7 +27,12 @@ import org.apache.openwhisk.core.connector.ContainerCreationMessage
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.etcd.EtcdClient
 import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse
-import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
+import org.apache.openwhisk.core.scheduler.message.{
+  ContainerCreation,
+  ContainerDeletion,
+  FailedCreationJob,
+  SuccessfulCreationJob
+}
 import org.apache.openwhisk.core.scheduler.queue.MemoryQueue.checkToDropStaleActivation
 import org.apache.openwhisk.core.scheduler.queue._
 import org.apache.openwhisk.core.service._
@@ -47,6 +69,7 @@ class MemoryQueueFlowTests
   override def afterEach(): Unit = {
     super.afterEach()
     logLines.foreach(println)
+    stream.reset()
   }
 
   behavior of "MemoryQueueFlow"
@@ -331,9 +354,7 @@ class MemoryQueueFlowTests
       fsm.underlyingActor.queue.size shouldBe 0
     }, 5.seconds)
 
-    Thread.sleep(flushGrace.toMillis)
-
-    parent.expectMsg(queueRemovedMsg)
+    parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg)
     probe.expectMsg(Transition(fsm, Flushing, Removed))
 
     fsm ! QueueRemovedCompleted
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 887e75a93..c75a400fc 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.openwhisk.core.scheduler.queue.test
 
 import java.time.Instant
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index ceaf3dd7b..8c4ee849f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.openwhisk.core.scheduler.queue.test
 
 import java.time.Instant
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index 208702506..6ad1513f7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -19,23 +19,23 @@ package org.apache.openwhisk.core.scheduler.queue.test
 
 import java.time.{Clock, Instant}
 import java.util.concurrent.atomic.AtomicInteger
-
 import akka.actor.{Actor, ActorIdentity, ActorRef, ActorRefFactory, ActorSystem, Identify, Props}
 import akka.pattern.ask
 import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe}
 import akka.util.Timeout
-import com.ibm.etcd.api.RangeResponse
+import com.ibm.etcd.api.{KeyValue, RangeResponse}
 import common.{LoggedFunction, StreamLogging}
 import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
 import org.apache.openwhisk.core.WarmUp.warmUpAction
 import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector.test.TestConnector
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, StatusData, StatusQuery}
 import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
 import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.etcd.EtcdType._
 import org.apache.openwhisk.core.scheduler.grpc.test.CommonVariable
 import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation}
 import org.apache.openwhisk.core.scheduler.queue._
@@ -78,7 +78,7 @@ class QueueManagerTests
   val testQueueCreationMessage =
     CreateQueue(testInvocationNamespace, testFQN, testDocRevision, testActionMetaData)
 
-  val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 2552, 8080)
+  val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 8080, 2552)
   val mockConsumer = new TestConnector(s"scheduler${schedulerId.asString}", 4, true)
 
   val messageTransId = TransactionId(TransactionId.testing.meta.id)
@@ -100,6 +100,12 @@ class QueueManagerTests
     ControllerInstanceId("0"),
     blocking = false,
     content = None)
+  val statusData = StatusData(testInvocationNamespace, testFQN.asString, 0, "Running", "RunningData")
+
+  // update start time for activation to ensure it's not stale
+  def newActivation(start: Instant = Instant.now()): ActivationMessage = {
+    activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = start)))
+  }
 
   val activationResponse = ActivationResponse(Right(activationMessage))
 
@@ -125,7 +131,9 @@ class QueueManagerTests
       system.actorOf(Props(new Actor() {
         override def receive: Receive = {
           case GetActivation(_, _, _, _, _, _) =>
-            sender ! ActivationResponse(Right(activationMessage))
+            sender ! ActivationResponse(Right(newActivation()))
+          case StatusQuery =>
+            sender ! statusData
         }
       }))
 
@@ -220,6 +228,34 @@ class QueueManagerTests
       true)
   }
 
+  it should "response queue creation request when failed to do election" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val dataManagementService = getTestDataManagementService(false)
+    val watcher = TestProbe()
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+  }
+
   it should "not create a queue if there is already a queue for the given fqn" in {
     val mockEtcdClient = mock[EtcdClient]
     val dataManagementService = getTestDataManagementService()
@@ -298,12 +334,16 @@ class QueueManagerTests
     probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
   }
 
-  private def getTestDataManagementService() = {
+  private def getTestDataManagementService(success: Boolean = true) = {
     val dataManagementService = TestProbe()
     dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) =>
       msg match {
         case ElectLeader(key, value, _, _) =>
-          sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+          if (success) {
+            sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+          } else {
+            sender ! ElectionResult(Left(EtcdFollower(key, value)))
+          }
           TestActor.KeepRunning
 
         case _ =>
@@ -312,6 +352,61 @@ class QueueManagerTests
     dataManagementService
   }
 
+  it should "forward msg to remote queue when queue exist on remote" in {
+    stream.reset()
+    val leaderKey = QueueKeys.queue(
+      activationMessage.user.namespace.name.asString,
+      activationMessage.action.copy(version = None),
+      true)
+    val mockEtcdClient = mock[EtcdClient]
+    (mockEtcdClient
+      .get(_: String))
+      .expects(*)
+      .returning(
+        Future.successful(
+          RangeResponse
+            .newBuilder()
+            .addKvs(KeyValue.newBuilder().setKey(leaderKey).setValue(schedulerEndpoint.serialize).build())
+            .build()))
+      .once()
+    val dataManagementService = getTestDataManagementService()
+    val watcher = TestProbe()
+
+    val probe = TestProbe()
+
+    val childFactory =
+      (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+    watcher.expectMsg(watchEndpoint)
+
+    // got a message but no queue created on this scheduler
+    // it should try to got leader key from etcd and forward this msg to remote queue, here is `schedulerEndpoints`
+    queueManager ! newActivation()
+    stream.toString should include(s"send activation to remote queue, key: $leaderKey")
+    stream.toString should include(s"add a new actor selection to a map with key: $leaderKey")
+    stream.reset()
+
+    // got msg again, and it should get remote queue from memory instead of etcd
+    val msg2 = newActivation().copy(activationId = ActivationId.generate())
+    queueManager ! msg2
+    stream.toString shouldNot include(s"send activation to remote queue, key: $leaderKey")
+  }
+
   it should "create a new MemoryQueue when the revision matches with the one in a datastore" in {
     val mockEtcdClient = mock[EtcdClient]
     val dataManagementService = getTestDataManagementService()
@@ -367,10 +462,14 @@ class QueueManagerTests
       content = None)
 
     queueManager ! activationMessage
-    queueManager ! activationMessage.copy(activationId = ActivationId.generate()) // even send two requests, we should only create one queue
+    val msgs = (0 to 10).map(i => {
+      activationMessage.copy(activationId = ActivationId.generate())
+    })
+    msgs.foreach(msg => queueManager ! msg) // even send multiple requests, we should only create new queue for once
     probe.expectMsg(StopSchedulingAsOutdated)
     probe.expectMsg(VersionUpdated)
     probe.expectMsg(activationMessage)
+    msgs.foreach(msg => probe.expectMsg(msg))
   }
 
   it should "create a new MemoryQueue correctly when the action is updated again during updating the queue" in {
@@ -453,6 +552,15 @@ class QueueManagerTests
   it should "not skip outdated activation when the revision is older than the one in a datastore" in {
     stream.reset()
     val mockEtcdClient = mock[EtcdClient]
+    (mockEtcdClient
+      .get(_: String))
+      .expects(*)
+      .returning(
+        Future.successful(
+          RangeResponse
+            .newBuilder()
+            .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
+            .build()))
     val dataManagementService = getTestDataManagementService()
     val watcher = TestProbe()
 
@@ -491,7 +599,7 @@ class QueueManagerTests
       true)
 
     //the activationMessage's revision(1-test-revision) is older than current queue's revision(2-test-revision)
-    queueManager ! activationMessage
+    queueManager ! newActivation()
 
     stream.toString should include(s"it will be replaced with the latest revision and invoked")
   }
@@ -522,7 +630,7 @@ class QueueManagerTests
           mockConsumer,
           QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
 
-    queueManager ! activationMessage
+    queueManager ! newActivation()
     Thread.sleep(100)
     (mockEtcdClient.get _) verify (*) repeated (3)
   }
@@ -554,11 +662,84 @@ class QueueManagerTests
           mockConsumer,
           QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
 
-    queueManager ! activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = Instant.now())))
+    queueManager ! newActivation()
     Thread.sleep(100)
     (mockEtcdClient.get _) verify (*) repeated (3)
   }
 
+  it should "save queue endpoint in memory" in {
+    stream.reset()
+
+    val mockEtcdClient = stub[EtcdClient]
+    val dataManagementService = getTestDataManagementService()
+    dataManagementService.ignoreMsg {
+      case _: UpdateDataOnChange => true
+    }
+    val watcher = TestProbe()
+
+    val emptyResult = Future.successful(RangeResponse.newBuilder().build())
+    (mockEtcdClient.get _) when (*) returns (emptyResult)
+
+    val queueManager =
+      TestActorRef(
+        new QueueManager(
+          entityStore,
+          get,
+          mockEtcdClient,
+          schedulerEndpoint,
+          schedulerId,
+          dataManagementService.ref,
+          watcher.ref,
+          ack,
+          store,
+          childFactory,
+          mockConsumer,
+          QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+    queueManager ! WatchEndpointInserted("queue", "queue/test-action/leader", schedulerEndpoint.serialize, true)
+    stream.toString should include(s"Endpoint inserted, key: queue/test-action/leader, endpoints: ${schedulerEndpoint}")
+    stream.reset()
+
+    queueManager ! WatchEndpointInserted("queue", "queue/test-action/leader", "host with wrong format", true)
+    stream.toString should include(s"Unexpected error")
+    stream.toString should include(s"when put leaderKey: queue/test-action/leader")
+    stream.reset()
+
+    queueManager ! WatchEndpointRemoved("queue", "queue/test-action/leader", schedulerEndpoint.serialize, true)
+    stream.toString should include(s"Endpoint removed for key: queue/test-action/leader")
+  }
+
+  it should "able to query queue status" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val watcher = TestProbe()
+    val dataManagementService = getTestDataManagementService()
+    val queueManager =
+      TestActorRef(
+        QueueManager
+          .props(
+            entityStore,
+            get,
+            mockEtcdClient,
+            schedulerEndpoint,
+            schedulerId,
+            dataManagementService.ref,
+            watcher.ref,
+            ack,
+            store,
+            childFactory,
+            mockConsumer))
+
+    watcher.expectMsg(watchEndpoint)
+    (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+      testInvocationNamespace,
+      testFQN,
+      true)
+
+    (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
+
+    (queueManager ? StatusQuery).mapTo[Future[Iterable[StatusData]]].futureValue.futureValue shouldBe List(statusData)
+  }
+
   it should "drop the activation message that has not been scheduled for a long time" in {
     val mockEtcdClient = mock[EtcdClient]
     val watcher = TestProbe()
@@ -577,7 +758,7 @@ class QueueManagerTests
     }
 
     val oldNow = Instant.now(Clock.systemUTC()).minusMillis(11000)
-    val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+    val oldActivationMessage = newActivation(oldNow)
 
     val queueManager =
       TestActorRef(
@@ -606,6 +787,15 @@ class QueueManagerTests
 
   it should "not drop the unscheduled activation message that has been processed within the scheduling time limit." in {
     val mockEtcdClient = mock[EtcdClient]
+    (mockEtcdClient
+      .get(_: String))
+      .expects(*)
+      .returning(
+        Future.successful(
+          RangeResponse
+            .newBuilder()
+            .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
+            .build()))
     val watcher = TestProbe()
     val probe = TestProbe()
     val dataManagementService = getTestDataManagementService()
@@ -622,7 +812,7 @@ class QueueManagerTests
     }
 
     val oldNow = Instant.now(Clock.systemUTC()).minusMillis(9000)
-    val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+    val oldActivationMessage = newActivation(oldNow)
 
     val queueManager =
       TestActorRef(
@@ -692,7 +882,7 @@ class QueueManagerTests
       true)
 
     queueManager.tell(
-      UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, activationMessage),
+      UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, newActivation()),
       consumer.ref)
 
     probe.expectMsg(activationMessage.activationId)
@@ -730,7 +920,7 @@ class QueueManagerTests
       testFQN.toDocId.asDocInfo(testDocRevision),
       Some(testLeaderKey))
 
-    QueuePool.size shouldBe 0
+    (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 0
   }
 
   it should "put the queue back to pool if it receives a QueueReactive message" in {
@@ -759,18 +949,18 @@ class QueueManagerTests
       testFQN,
       true)
 
-    QueuePool.size shouldBe 1
+    (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
 
     queueManager ! QueueRemoved(
       testInvocationNamespace,
       testFQN.toDocId.asDocInfo(testDocRevision),
       Some(testLeaderKey))
 
-    QueuePool.size shouldBe 0
+    (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 0
 
     queueManager ! QueueReactivated(testInvocationNamespace, testFQN, testFQN.toDocId.asDocInfo(testDocRevision))
 
-    QueuePool.size shouldBe 1
+    (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
   }
 
   it should "put pool information to data management service" in {
@@ -899,7 +1089,7 @@ class QueueManagerTests
         system.actorOf(Props(new Actor() {
           override def receive: Receive = {
             case GetActivation(_, _, _, _, _, _) =>
-              sender ! ActivationResponse(Right(activationMessage))
+              sender ! ActivationResponse(Right(newActivation()))
 
             case GracefulShutdown =>
               probe.ref ! GracefulShutdown
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index 20771e6b3..83a4b6f8e 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -1,15 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.openwhisk.core.scheduler.queue.test
 
 import java.util.concurrent.atomic.AtomicInteger
-
 import akka.actor.ActorSystem
 import akka.testkit.{TestKit, TestProbe}
 import common.StreamLogging
 import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName, SemVer}
 import org.apache.openwhisk.core.scheduler.queue._
+import org.junit.runner.RunWith
 import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpecLike, Matchers}
 
+@RunWith(classOf[JUnitRunner])
 class SchedulingDecisionMakerTests
     extends TestKit(ActorSystem("SchedulingDecisionMakerTests"))
     with FlatSpecLike