You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ji...@apache.org on 2022/05/26 05:14:13 UTC
[openwhisk] branch master updated: Add some testcases and missing ASF headers for new scheduler (#5243)
This is an automated email from the ASF dual-hosted git repository.
jiangpengcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new c90e8ccbc Add some testcases and missing ASF headers for new scheduler (#5243)
c90e8ccbc is described below
commit c90e8ccbc6e49bebc4bf9d641bd8aee0085cc805
Author: jiangpengcheng <ji...@navercorp.com>
AuthorDate: Thu May 26 13:14:05 2022 +0800
Add some testcases and missing ASF headers for new scheduler (#5243)
* Add some testcases and missing ASF headers for new scheduler
* Reset stream after each test
* Wait more time
* Update msg start time to ensure it's not stale
* Make MemoryQueueFlowTests stable
* Ignore warmUp msg immediately
* Use retry to replace sleep
* Add missing config
* Fix configuration error
---
core/controller/src/main/resources/reference.conf | 2 +-
.../core/loadBalancer/FPCPoolBalancer.scala | 17 ++
.../scheduler/queue/SchedulingDecisionMaker.scala | 17 ++
.../core/controller/test/FPCEntitlementTests.scala | 61 ++++++
.../loadBalancer/test/FPCPoolBalancerTests.scala | 17 ++
.../container/test/ContainerManagerTests.scala | 198 ++++++++++++++++--
.../grpc/test/ActivationServiceImplTests.scala | 55 ++++-
.../queue/test/ContainerCounterTests.scala | 43 +++-
.../queue/test/MemoryQueueFlowTests.scala | 29 ++-
.../scheduler/queue/test/MemoryQueueTests.scala | 17 ++
.../queue/test/MemoryQueueTestsFixture.scala | 17 ++
.../scheduler/queue/test/QueueManagerTests.scala | 230 +++++++++++++++++++--
.../queue/test/SchedulingDecisionMakerTests.scala | 21 +-
13 files changed, 676 insertions(+), 48 deletions(-)
diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf
index dfde945a0..1eb4eb580 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -31,7 +31,7 @@ whisk {
timeout-addon = 1m
fpc {
- use-perMin-throttles = false
+ use-per-min-throttles = false
}
}
controller {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index bd72c8cb6..576d3b3b1 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.openwhisk.core.loadBalancer
import java.nio.charset.StandardCharsets
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index f36f1f82f..0bbc1d98b 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.openwhisk.core.scheduler.queue
import akka.actor.{Actor, ActorSystem, Props}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala
new file mode 100644
index 000000000..06724e5f0
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/FPCEntitlementTests.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.controller.test
+
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.controller.RejectRequest
+import org.apache.openwhisk.core.entitlement.{EntitlementProvider, FPCEntitlementProvider, Privilege, Resource}
+import org.apache.openwhisk.core.entitlement.Privilege.{ACTIVATE, DELETE, PUT, READ, REJECT}
+import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.loadBalancer.LoadBalancer
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class FPCEntitlementProviderTests extends ControllerTestCommon with ScalaFutures with MockFactory {
+
+ implicit val transactionId = TransactionId.testing
+
+ it should "get throttle flag from loadBalancer" in {
+ val someUser = WhiskAuthHelpers.newIdentity()
+ val action = FullyQualifiedEntityName(EntityPath("testns"), EntityName("action"))
+ val loadBalancer = mock[LoadBalancer]
+ (loadBalancer
+ .checkThrottle(_: EntityPath, _: String))
+ .expects(someUser.namespace.name.toPath, action.fullPath.asString)
+ .returning(true)
+ val resources = Set(Resource(action.path, ACTIONS, Some(action.name.name)))
+
+ val entitlementProvider: EntitlementProvider = new FPCEntitlementProvider(whiskConfig, loadBalancer, instance)
+ entitlementProvider.checkThrottles(someUser, ACTIVATE, resources).failed.futureValue shouldBe a[RejectRequest]
+
+ Seq[Privilege](READ, PUT, DELETE, REJECT).foreach(OP => {
+ noException shouldBe thrownBy(entitlementProvider.checkThrottles(someUser, OP, resources).futureValue)
+ })
+
+ val action2 = FullyQualifiedEntityName(EntityPath("testns2"), EntityName("action2"))
+ val resources2 = Set(Resource(action2.path, ACTIONS, Some(action2.name.name)))
+ (loadBalancer
+ .checkThrottle(_: EntityPath, _: String))
+ .expects(someUser.namespace.name.toPath, action2.fullPath.asString)
+ .returning(false)
+ noException shouldBe thrownBy(entitlementProvider.checkThrottles(someUser, ACTIVATE, resources2).futureValue)
+ }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala
index 98342fc22..81aa708b4 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/FPCPoolBalancerTests.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.openwhisk.core.loadBalancer.test
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
index 6c17346da..a00b632c8 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/ContainerManagerTests.scala
@@ -49,7 +49,7 @@ import org.apache.openwhisk.core.scheduler.message.{
SuccessfulCreationJob
}
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, MemoryQueueValue, QueuePool}
-import org.apache.openwhisk.core.service.WatchEndpointInserted
+import org.apache.openwhisk.core.service.{WatchEndpointInserted, WatchEndpointRemoved}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
@@ -77,6 +77,7 @@ class ContainerManagerTests
with StreamLogging {
val config = new WhiskConfig(ExecManifest.requiredProperties)
+ ExecManifest.initialize(config)
val testInvocationNamespace = "test-invocation-namespace"
val testNamespace = "test-namespace"
@@ -278,11 +279,16 @@ class ContainerManagerTests
)
expectGetInvokers(mockEtcd, invokers)
expectGetInvokers(mockEtcd, invokers)
- expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` twice, and another one for warmup
+ expectGetInvokers(mockEtcd, invokers)
+ expectGetInvokers(mockEtcd, invokers) // this test case will run `getPrefix` for 3 times, and another one for warmup
val mockJobManager = TestProbe()
val mockWatcher = TestProbe()
val receiver = TestProbe()
+ // ignore warmUp message
+ receiver.ignoreMsg {
+ case s: String => s.contains("warmUp")
+ }
val manager =
system.actorOf(ContainerManager
@@ -345,11 +351,6 @@ class ContainerManagerTests
// it should reuse 2 warmed containers
manager ! ContainerCreation(msgs, 128.MB, testInvocationNamespace)
- // ignore warmUp message
- receiver.ignoreMsg {
- case s: String => s.contains("warmUp")
- }
-
// msg1 will use warmed container on invoker0, msg2 use warmed container on invoker1, msg3 use the healthy invoker
receiver.expectMsg(s"invoker0-$msg1")
receiver.expectMsg(s"invoker1-$msg2")
@@ -371,10 +372,30 @@ class ContainerManagerTests
manager ! ContainerCreation(List(msg2), 128.MB, testInvocationNamespace)
receiver.expectMsg(s"invoker1-$msg2")
- // warmed container for action1 become warmed
- manager ! SuccessfulCreationJob(msg1.creationId, msg1.invocationNamespace, msg1.action, msg1.revision)
+ // warmed container for action1 become warmed when received FailedCreationJob
+ manager ! FailedCreationJob(
+ msg1.creationId,
+ msg1.invocationNamespace,
+ msg1.action,
+ msg1.revision,
+ NoAvailableResourceInvokersError,
+ "")
manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
receiver.expectMsg(s"invoker0-$msg1")
+
+ // warmed container for action1 become unwarmed
+ manager ! WatchEndpointRemoved(
+ ContainerKeys.warmedPrefix,
+ ContainerKeys.warmedContainers(
+ testInvocationNamespace,
+ testfqn,
+ testRevision,
+ InvokerInstanceId(0, userMemory = 0.bytes),
+ ContainerId("fake")),
+ "",
+ true)
+ manager ! ContainerCreation(List(msg1), 128.MB, testInvocationNamespace)
+ receiver.expectMsg(s"invoker2-$msg1")
}
it should "not try warmed containers if revision is unmatched" in {
@@ -392,6 +413,10 @@ class ContainerManagerTests
val mockJobManager = TestProbe()
val mockWatcher = TestProbe()
val receiver = TestProbe()
+ // ignore warmUp message
+ receiver.ignoreMsg {
+ case s: String => s.contains("warmUp")
+ }
val manager =
system.actorOf(ContainerManager
@@ -423,11 +448,6 @@ class ContainerManagerTests
// it should not reuse the warmed container
manager ! ContainerCreation(List(msg), 128.MB, testInvocationNamespace)
- // ignore warmUp message
- receiver.ignoreMsg {
- case s: String => s.contains("warmUp")
- }
-
// it should be scheduled to the sole health invoker: invoker2
receiver.expectMsg(s"invoker2-$msg")
@@ -686,6 +706,52 @@ class ContainerManagerTests
"No available invokers with resources List(fake)."))
}
+ it should "choose tagged invokers when no invokers available which has no tags first" in {
+ val msg =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns1")),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+ val msg2 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns2")),
+ testRevision,
+ actionMetadata,
+ testsid,
+ schedulerHost,
+ rpcPort)
+
+ val probe = TestProbe()
+ QueuePool.put(
+ MemoryQueueKey(testInvocationNamespace, testfqn.toDocId.asDocInfo(testRevision)),
+ MemoryQueueValue(probe.ref, true))
+
+ val healthyInvokers: List[InvokerHealth] =
+ List(InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq("cpu", "memory")), Healthy))
+
+ // there is no available invokers which has no tags, it should choose tagged invokers for msg
+ // and for msg2, it should return no available invokers
+ val pairs =
+ ContainerManager.schedule(healthyInvokers, List(msg, msg2), msg.whiskActionMetaData.limits.memory.megabytes.MB)
+ pairs should contain theSameElementsAs List(ScheduledPair(msg, healthyInvokers(0).id))
+
+ probe.expectMsg(
+ FailedCreationJob(
+ msg2.creationId,
+ testInvocationNamespace,
+ msg2.action,
+ testRevision,
+ NoAvailableInvokersError,
+ "No available invokers."))
+ }
+
it should "respect the resource policy while use resource filter" in {
val msg1 =
ContainerCreationMessage(
@@ -730,6 +796,21 @@ class ContainerManagerTests
testsid,
schedulerHost,
rpcPort)
+ val msg4 =
+ ContainerCreationMessage(
+ TransactionId.testing,
+ testInvocationNamespace,
+ testfqn.resolve(EntityName("ns3")),
+ testRevision,
+ actionMetadata.copy(
+ limits = action.limits.copy(memory = MemoryLimit(512.MB)),
+ annotations =
+ Parameters(Annotations.InvokerResourcesAnnotationName, JsArray(JsString("non-exist"))) ++ Parameters(
+ Annotations.InvokerResourcesStrictPolicyAnnotationName,
+ JsBoolean(false))),
+ testsid,
+ schedulerHost,
+ rpcPort)
val probe = TestProbe()
QueuePool.put(
@@ -738,7 +819,7 @@ class ContainerManagerTests
val healthyInvokers: List[InvokerHealth] =
List(
InvokerHealth(InvokerInstanceId(0, userMemory = 256.MB, tags = Seq.empty[String]), Healthy),
- InvokerHealth(InvokerInstanceId(1, userMemory = 512.MB, tags = Seq("cpu", "memory")), Healthy))
+ InvokerHealth(InvokerInstanceId(1, userMemory = 256.MB, tags = Seq("cpu", "memory")), Healthy))
// while resourcesStrictPolicy is true, and there is no suitable invokers, return an error
val pairs =
@@ -760,12 +841,20 @@ class ContainerManagerTests
pairs2 should contain theSameElementsAs List(ScheduledPair(msg2, healthyInvokers(0).id))
// while resourcesStrictPolicy is false, and there is no suitable invokers, should choose no tagged invokers first,
- // if there is none, then choose other invokers, here is the invoker1
+ // if there is none, then choose invokers with other tags, if there is still none, return no available invokers
val pairs3 = ContainerManager.schedule(
healthyInvokers.takeRight(1),
- List(msg3),
+ List(msg3, msg4),
msg3.whiskActionMetaData.limits.memory.megabytes.MB)
pairs3 should contain theSameElementsAs List(ScheduledPair(msg3, healthyInvokers(1).id))
+ probe.expectMsg(
+ FailedCreationJob(
+ msg4.creationId,
+ testInvocationNamespace,
+ msg4.action,
+ testRevision,
+ NoAvailableInvokersError,
+ "No available invokers."))
}
it should "send FailedCreationJob to queue manager when no invokers are available" in {
@@ -991,6 +1080,81 @@ class ContainerManagerTests
result.take(m) shouldBe result.takeRight(b)
}
+
+ behavior of "warm up"
+
+ it should "warm up all invokers when start" in {
+ val mockEtcd = mock[EtcdClient]
+
+ val invokers: List[InvokerHealth] = List(
+ InvokerHealth(InvokerInstanceId(0, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(1, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ InvokerHealth(InvokerInstanceId(2, userMemory = testMemory, tags = Seq.empty[String]), Healthy),
+ )
+ expectGetInvokers(mockEtcd, invokers)
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+ val receiver = TestProbe()
+
+ val manager =
+ system.actorOf(ContainerManager
+ .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+ (0 to 2).foreach(i => {
+ receiver.expectMsgPF() {
+ case msg: String if msg.contains("warmUp") && msg.contains(s"invoker$i") => true
+ case msg => false
+ }
+ })
+ }
+
+ it should "warm up new invoker when new one is registered" in {
+ val mockEtcd = mock[EtcdClient]
+ expectGetInvokers(mockEtcd, List.empty)
+
+ val mockJobManager = TestProbe()
+ val mockWatcher = TestProbe()
+ val receiver = TestProbe()
+
+ val manager =
+ system.actorOf(ContainerManager
+ .props(factory(mockJobManager), mockMessaging(Some(receiver.ref)), testsid, mockEtcd, config, mockWatcher.ref))
+
+ manager ! WatchEndpointInserted(
+ InvokerKeys.prefix,
+ InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+ "",
+ true)
+ receiver.expectMsgPF() {
+ case msg: String if msg.contains("warmUp") && msg.contains(s"invoker0") => true
+ case _ => false
+ }
+
+ // shouldn't warmup again
+ manager ! WatchEndpointInserted(
+ InvokerKeys.prefix,
+ InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+ "",
+ true)
+ receiver.expectNoMessage()
+
+ // should warmup again since invoker0 is a new one
+ manager ! WatchEndpointRemoved(
+ InvokerKeys.prefix,
+ InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+ "",
+ true)
+ manager ! WatchEndpointInserted(
+ InvokerKeys.prefix,
+ InvokerKeys.health(InvokerInstanceId(0, userMemory = 128.MB)),
+ "",
+ true)
+ receiver.expectMsgPF() {
+ case msg: String if msg.contains("warmUp") && msg.contains(s"invoker0") => true
+ case _ => false
+ }
+ }
}
@RunWith(classOf[JUnitRunner])
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
index 7aa43b031..b2042f256 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
@@ -33,7 +33,7 @@ import org.apache.openwhisk.core.scheduler.queue.{
NoMemoryQueue,
QueuePool
}
-import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse}
+import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
@@ -64,7 +64,7 @@ class ActivationServiceImplTests
private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
- implicit val timeoutConfig = PatienceConfig(5.seconds)
+ implicit val timeoutConfig = PatienceConfig(10.seconds)
behavior of "ActivationService"
@@ -156,6 +156,28 @@ class ActivationServiceImplTests
expectNoMessage(200.millis)
}
+ it should "return NoActivationMessage if queue doesn't return response" in {
+
+ val activationServiceImpl = ActivationServiceImpl()
+
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true))
+
+ val tid = TransactionId(TransactionId.generateTid())
+ activationServiceImpl
+ .fetchActivation(
+ FetchRequest(
+ tid.serialize,
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ testDocRevision.serialize,
+ testContainerId,
+ false,
+ alive = true))
+ .futureValue shouldBe FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+
+ expectMsg(GetActivation(tid, testFQN, testContainerId, false, None))
+ }
+
it should "return NoActivationMessage if it is a warm-up action" in {
val activationServiceImpl = ActivationServiceImpl()
@@ -219,4 +241,33 @@ class ActivationServiceImplTests
}
}
+ it should "reschedule msg if related queue exist" in {
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true))
+ val activationServiceImpl = ActivationServiceImpl()
+
+ activationServiceImpl
+ .rescheduleActivation(
+ RescheduleRequest(
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ testDocRevision.serialize,
+ message.serialize))
+ .futureValue shouldBe RescheduleResponse(true)
+
+ expectMsg(message)
+ }
+
+ it should "not reschedule msg if queue doesn't exist" in {
+ val activationServiceImpl = ActivationServiceImpl()
+
+ activationServiceImpl
+ .rescheduleActivation(
+ RescheduleRequest(
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ testDocRevision.serialize,
+ message.serialize))
+ .futureValue shouldBe RescheduleResponse()
+ }
+
}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala
index e9e6694e0..7b6cbefbe 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/ContainerCounterTests.scala
@@ -42,6 +42,7 @@ import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.inProgressContainer
import org.apache.openwhisk.core.scheduler.queue.NamespaceContainerCount
import org.apache.openwhisk.core.service.{DeleteEvent, PutEvent, UnwatchEndpoint, WatchEndpoint, WatcherService}
+import org.apache.openwhisk.utils.retry
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.ScalaFutures
@@ -49,7 +50,7 @@ import org.scalatest.{FlatSpecLike, Matchers}
import org.scalatest.junit.JUnitRunner
import scala.concurrent.Future
-import scala.concurrent.duration.TimeUnit
+import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
class ContainerCounterTests
@@ -247,11 +248,42 @@ class ContainerCounterTests
NamespaceContainerCount.instances.clear()
}
- class MockEtcdClient(client: Client, isLeader: Boolean, leaseNotFound: Boolean = false, failedCount: Int = 1)
+ it should "update the number of containers correctly when multiple entries are inserted into etcd" in {
+ val mockEtcdClient = new MockEtcdClient(client, true, failedCount = 1)
+ val watcher = system.actorOf(WatcherService.props(mockEtcdClient))
+
+ val ns = NamespaceContainerCount(namespace, mockEtcdClient, watcher)
+ retry(() => {
+ ns.inProgressContainerNumByNamespace shouldBe 0
+ ns.existingContainerNumByNamespace shouldBe 0
+ }, 10, Some(100.milliseconds))
+
+ val invoker = "invoker0"
+ (0 to 100).foreach(i => {
+ mockEtcdClient.publishEvents(
+ EventType.PUT,
+ inProgressContainer(namespace, fqn, revision, schedulerId, CreationId(s"testId$i")),
+ "test-value")
+ })
+ (0 to 100).foreach(i => {
+ mockEtcdClient.publishEvents(
+ EventType.PUT,
+ s"${ContainerKeys.existingContainers(namespace, fqn, DocRevision.empty)}/${invoker}/test-container$i",
+ "test-value")
+ })
+
+ retry(() => {
+ ns.inProgressContainerNumByNamespace shouldBe 101
+ ns.existingContainerNumByNamespace shouldBe 101
+ }, 50, Some(100.milliseconds))
+ }
+
+ class MockEtcdClient(client: Client, isLeader: Boolean, leaseNotFound: Boolean = false, failedCount: Int = 0)
extends EtcdClient(client)(ec) {
var count = 0
var storedValues = List.empty[(String, String, Long, Long)]
var dataMap = Map[String, String]()
+ var totalFailedCount = 0
override def putTxn[T](key: String, value: T, cmpVersion: Long, leaseId: Long): Future[TxnResponse] = {
if (isLeader) {
@@ -264,7 +296,12 @@ class ContainerCounterTests
* this method count the number of entries whose key starts with the given prefix
*/
override def getCount(prefixKey: String): Future[Long] = {
- Future.successful { dataMap.count(data => data._1.startsWith(prefixKey)) }
+ if (totalFailedCount < failedCount) {
+ totalFailedCount += 1
+ Future.failed(new Exception("error"))
+ } else {
+ Future.successful { dataMap.count(data => data._1.startsWith(prefixKey)) }
+ }
}
var watchCallbackMap = Map[String, WatchUpdate => Unit]()
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 735506d5b..1d523c20d 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.openwhisk.core.scheduler.queue.test
import akka.actor.ActorRef
@@ -10,7 +27,12 @@ import org.apache.openwhisk.core.connector.ContainerCreationMessage
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse
-import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
+import org.apache.openwhisk.core.scheduler.message.{
+ ContainerCreation,
+ ContainerDeletion,
+ FailedCreationJob,
+ SuccessfulCreationJob
+}
import org.apache.openwhisk.core.scheduler.queue.MemoryQueue.checkToDropStaleActivation
import org.apache.openwhisk.core.scheduler.queue._
import org.apache.openwhisk.core.service._
@@ -47,6 +69,7 @@ class MemoryQueueFlowTests
override def afterEach(): Unit = {
super.afterEach()
logLines.foreach(println)
+ stream.reset()
}
behavior of "MemoryQueueFlow"
@@ -331,9 +354,7 @@ class MemoryQueueFlowTests
fsm.underlyingActor.queue.size shouldBe 0
}, 5.seconds)
- Thread.sleep(flushGrace.toMillis)
-
- parent.expectMsg(queueRemovedMsg)
+ parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg)
probe.expectMsg(Transition(fsm, Flushing, Removed))
fsm ! QueueRemovedCompleted
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 887e75a93..c75a400fc 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.openwhisk.core.scheduler.queue.test
import java.time.Instant
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index ceaf3dd7b..8c4ee849f 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.openwhisk.core.scheduler.queue.test
import java.time.Instant
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index 208702506..6ad1513f7 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -19,23 +19,23 @@ package org.apache.openwhisk.core.scheduler.queue.test
import java.time.{Clock, Instant}
import java.util.concurrent.atomic.AtomicInteger
-
import akka.actor.{Actor, ActorIdentity, ActorRef, ActorRefFactory, ActorSystem, Identify, Props}
import akka.pattern.ask
import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, TestProbe}
import akka.util.Timeout
-import com.ibm.etcd.api.RangeResponse
+import com.ibm.etcd.api.{KeyValue, RangeResponse}
import common.{LoggedFunction, StreamLogging}
import org.apache.openwhisk.common.{GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.WarmUp.warmUpAction
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.test.TestConnector
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage}
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, ActivationMessage, StatusData, StatusQuery}
import org.apache.openwhisk.core.database.{ArtifactStore, DocumentRevisionMismatchException, UserContext}
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdKV.QueueKeys
-import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdLeader}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.scheduler.grpc.test.CommonVariable
import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation}
import org.apache.openwhisk.core.scheduler.queue._
@@ -78,7 +78,7 @@ class QueueManagerTests
val testQueueCreationMessage =
CreateQueue(testInvocationNamespace, testFQN, testDocRevision, testActionMetaData)
- val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 2552, 8080)
+ val schedulerEndpoint = SchedulerEndpoints("127.0.0.1", 8080, 2552)
val mockConsumer = new TestConnector(s"scheduler${schedulerId.asString}", 4, true)
val messageTransId = TransactionId(TransactionId.testing.meta.id)
@@ -100,6 +100,12 @@ class QueueManagerTests
ControllerInstanceId("0"),
blocking = false,
content = None)
+ val statusData = StatusData(testInvocationNamespace, testFQN.asString, 0, "Running", "RunningData")
+
+ // update start time for activation to ensure it's not stale
+ def newActivation(start: Instant = Instant.now()): ActivationMessage = {
+ activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = start)))
+ }
val activationResponse = ActivationResponse(Right(activationMessage))
@@ -125,7 +131,9 @@ class QueueManagerTests
system.actorOf(Props(new Actor() {
override def receive: Receive = {
case GetActivation(_, _, _, _, _, _) =>
- sender ! ActivationResponse(Right(activationMessage))
+ sender ! ActivationResponse(Right(newActivation()))
+ case StatusQuery =>
+ sender ! statusData
}
}))
@@ -220,6 +228,34 @@ class QueueManagerTests
true)
}
+ it should "response queue creation request when failed to do election" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val dataManagementService = getTestDataManagementService(false)
+ val watcher = TestProbe()
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+ }
+
it should "not create a queue if there is already a queue for the given fqn" in {
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = getTestDataManagementService()
@@ -298,12 +334,16 @@ class QueueManagerTests
probe.expectMsg(CreateQueueResponse(testInvocationNamespace, testFQN, true))
}
- private def getTestDataManagementService() = {
+ private def getTestDataManagementService(success: Boolean = true) = {
val dataManagementService = TestProbe()
dataManagementService.setAutoPilot((sender: ActorRef, msg: Any) =>
msg match {
case ElectLeader(key, value, _, _) =>
- sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+ if (success) {
+ sender ! ElectionResult(Right(EtcdLeader(key, value, 10)))
+ } else {
+ sender ! ElectionResult(Left(EtcdFollower(key, value)))
+ }
TestActor.KeepRunning
case _ =>
@@ -312,6 +352,61 @@ class QueueManagerTests
dataManagementService
}
+ it should "forward msg to remote queue when queue exist on remote" in {
+ stream.reset()
+ val leaderKey = QueueKeys.queue(
+ activationMessage.user.namespace.name.asString,
+ activationMessage.action.copy(version = None),
+ true)
+ val mockEtcdClient = mock[EtcdClient]
+ (mockEtcdClient
+ .get(_: String))
+ .expects(*)
+ .returning(
+ Future.successful(
+ RangeResponse
+ .newBuilder()
+ .addKvs(KeyValue.newBuilder().setKey(leaderKey).setValue(schedulerEndpoint.serialize).build())
+ .build()))
+ .once()
+ val dataManagementService = getTestDataManagementService()
+ val watcher = TestProbe()
+
+ val probe = TestProbe()
+
+ val childFactory =
+ (_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref
+
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+ watcher.expectMsg(watchEndpoint)
+
+ // got a message but no queue created on this scheduler
+ // it should try to got leader key from etcd and forward this msg to remote queue, here is `schedulerEndpoints`
+ queueManager ! newActivation()
+ stream.toString should include(s"send activation to remote queue, key: $leaderKey")
+ stream.toString should include(s"add a new actor selection to a map with key: $leaderKey")
+ stream.reset()
+
+ // got msg again, and it should get remote queue from memory instead of etcd
+ val msg2 = newActivation().copy(activationId = ActivationId.generate())
+ queueManager ! msg2
+ stream.toString shouldNot include(s"send activation to remote queue, key: $leaderKey")
+ }
+
it should "create a new MemoryQueue when the revision matches with the one in a datastore" in {
val mockEtcdClient = mock[EtcdClient]
val dataManagementService = getTestDataManagementService()
@@ -367,10 +462,14 @@ class QueueManagerTests
content = None)
queueManager ! activationMessage
- queueManager ! activationMessage.copy(activationId = ActivationId.generate()) // even send two requests, we should only create one queue
+ val msgs = (0 to 10).map(i => {
+ activationMessage.copy(activationId = ActivationId.generate())
+ })
+ msgs.foreach(msg => queueManager ! msg) // even send multiple requests, we should only create new queue for once
probe.expectMsg(StopSchedulingAsOutdated)
probe.expectMsg(VersionUpdated)
probe.expectMsg(activationMessage)
+ msgs.foreach(msg => probe.expectMsg(msg))
}
it should "create a new MemoryQueue correctly when the action is updated again during updating the queue" in {
@@ -453,6 +552,15 @@ class QueueManagerTests
it should "not skip outdated activation when the revision is older than the one in a datastore" in {
stream.reset()
val mockEtcdClient = mock[EtcdClient]
+ (mockEtcdClient
+ .get(_: String))
+ .expects(*)
+ .returning(
+ Future.successful(
+ RangeResponse
+ .newBuilder()
+ .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
+ .build()))
val dataManagementService = getTestDataManagementService()
val watcher = TestProbe()
@@ -491,7 +599,7 @@ class QueueManagerTests
true)
//the activationMessage's revision(1-test-revision) is older than current queue's revision(2-test-revision)
- queueManager ! activationMessage
+ queueManager ! newActivation()
stream.toString should include(s"it will be replaced with the latest revision and invoked")
}
@@ -522,7 +630,7 @@ class QueueManagerTests
mockConsumer,
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
- queueManager ! activationMessage
+ queueManager ! newActivation()
Thread.sleep(100)
(mockEtcdClient.get _) verify (*) repeated (3)
}
@@ -554,11 +662,84 @@ class QueueManagerTests
mockConsumer,
QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
- queueManager ! activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = Instant.now())))
+ queueManager ! newActivation()
Thread.sleep(100)
(mockEtcdClient.get _) verify (*) repeated (3)
}
+ it should "save queue endpoint in memory" in {
+ stream.reset()
+
+ val mockEtcdClient = stub[EtcdClient]
+ val dataManagementService = getTestDataManagementService()
+ dataManagementService.ignoreMsg {
+ case _: UpdateDataOnChange => true
+ }
+ val watcher = TestProbe()
+
+ val emptyResult = Future.successful(RangeResponse.newBuilder().build())
+ (mockEtcdClient.get _) when (*) returns (emptyResult)
+
+ val queueManager =
+ TestActorRef(
+ new QueueManager(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer,
+ QueueManagerConfig(maxRetriesToGetQueue = 2, maxSchedulingTime = 10 seconds)))
+
+ queueManager ! WatchEndpointInserted("queue", "queue/test-action/leader", schedulerEndpoint.serialize, true)
+ stream.toString should include(s"Endpoint inserted, key: queue/test-action/leader, endpoints: ${schedulerEndpoint}")
+ stream.reset()
+
+ queueManager ! WatchEndpointInserted("queue", "queue/test-action/leader", "host with wrong format", true)
+ stream.toString should include(s"Unexpected error")
+ stream.toString should include(s"when put leaderKey: queue/test-action/leader")
+ stream.reset()
+
+ queueManager ! WatchEndpointRemoved("queue", "queue/test-action/leader", schedulerEndpoint.serialize, true)
+ stream.toString should include(s"Endpoint removed for key: queue/test-action/leader")
+ }
+
+ it should "able to query queue status" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val watcher = TestProbe()
+ val dataManagementService = getTestDataManagementService()
+ val queueManager =
+ TestActorRef(
+ QueueManager
+ .props(
+ entityStore,
+ get,
+ mockEtcdClient,
+ schedulerEndpoint,
+ schedulerId,
+ dataManagementService.ref,
+ watcher.ref,
+ ack,
+ store,
+ childFactory,
+ mockConsumer))
+
+ watcher.expectMsg(watchEndpoint)
+ (queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
+ testInvocationNamespace,
+ testFQN,
+ true)
+
+ (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
+
+ (queueManager ? StatusQuery).mapTo[Future[Iterable[StatusData]]].futureValue.futureValue shouldBe List(statusData)
+ }
+
it should "drop the activation message that has not been scheduled for a long time" in {
val mockEtcdClient = mock[EtcdClient]
val watcher = TestProbe()
@@ -577,7 +758,7 @@ class QueueManagerTests
}
val oldNow = Instant.now(Clock.systemUTC()).minusMillis(11000)
- val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+ val oldActivationMessage = newActivation(oldNow)
val queueManager =
TestActorRef(
@@ -606,6 +787,15 @@ class QueueManagerTests
it should "not drop the unscheduled activation message that has been processed within the scheduling time limit." in {
val mockEtcdClient = mock[EtcdClient]
+ (mockEtcdClient
+ .get(_: String))
+ .expects(*)
+ .returning(
+ Future.successful(
+ RangeResponse
+ .newBuilder()
+ .addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
+ .build()))
val watcher = TestProbe()
val probe = TestProbe()
val dataManagementService = getTestDataManagementService()
@@ -622,7 +812,7 @@ class QueueManagerTests
}
val oldNow = Instant.now(Clock.systemUTC()).minusMillis(9000)
- val oldActivationMessage = activationMessage.copy(transid = TransactionId(messageTransId.meta.copy(start = oldNow)))
+ val oldActivationMessage = newActivation(oldNow)
val queueManager =
TestActorRef(
@@ -692,7 +882,7 @@ class QueueManagerTests
true)
queueManager.tell(
- UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, activationMessage),
+ UpdateMemoryQueue(testFQN.toDocId.asDocInfo(testDocRevision), newFqn, newActivation()),
consumer.ref)
probe.expectMsg(activationMessage.activationId)
@@ -730,7 +920,7 @@ class QueueManagerTests
testFQN.toDocId.asDocInfo(testDocRevision),
Some(testLeaderKey))
- QueuePool.size shouldBe 0
+ (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 0
}
it should "put the queue back to pool if it receives a QueueReactive message" in {
@@ -759,18 +949,18 @@ class QueueManagerTests
testFQN,
true)
- QueuePool.size shouldBe 1
+ (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
queueManager ! QueueRemoved(
testInvocationNamespace,
testFQN.toDocId.asDocInfo(testDocRevision),
Some(testLeaderKey))
- QueuePool.size shouldBe 0
+ (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 0
queueManager ! QueueReactivated(testInvocationNamespace, testFQN, testFQN.toDocId.asDocInfo(testDocRevision))
- QueuePool.size shouldBe 1
+ (queueManager ? QueueSize).mapTo[Int].futureValue shouldBe 1
}
it should "put pool information to data management service" in {
@@ -899,7 +1089,7 @@ class QueueManagerTests
system.actorOf(Props(new Actor() {
override def receive: Receive = {
case GetActivation(_, _, _, _, _, _) =>
- sender ! ActivationResponse(Right(activationMessage))
+ sender ! ActivationResponse(Right(newActivation()))
case GracefulShutdown =>
probe.ref ! GracefulShutdown
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index 20771e6b3..83a4b6f8e 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -1,15 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.openwhisk.core.scheduler.queue.test
import java.util.concurrent.atomic.AtomicInteger
-
import akka.actor.ActorSystem
import akka.testkit.{TestKit, TestProbe}
import common.StreamLogging
import org.apache.openwhisk.core.entity.{EntityName, EntityPath, FullyQualifiedEntityName, SemVer}
import org.apache.openwhisk.core.scheduler.queue._
+import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpecLike, Matchers}
+@RunWith(classOf[JUnitRunner])
class SchedulingDecisionMakerTests
extends TestKit(ActorSystem("SchedulingDecisionMakerTests"))
with FlatSpecLike