You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/18 20:43:35 UTC

[12/12] incubator-iota git commit: Test for Ensemble.scala

Test for Ensemble.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/b7dd7961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/b7dd7961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/b7dd7961

Branch: refs/heads/master
Commit: b7dd7961ed6acc81838b4add05d774859460c341
Parents: 555550a
Author: Barbara Gomes <ba...@gmail.com>
Authored: Mon Jul 18 13:31:03 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Mon Jul 18 13:31:03 2016 -0700

----------------------------------------------------------------------
 .../org/apache/iota/fey/EnsembleSpec.scala      | 221 +++++++++++++++++++
 .../org/apache/iota/fey/Utils_JSONTest.scala    |  56 +++++
 2 files changed, 277 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/b7dd7961/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
new file mode 100644
index 0000000..3878dcc
--- /dev/null
+++ b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
@@ -0,0 +1,221 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{ActorRef, PoisonPill, Props}
+import akka.testkit.{EventFilter, TestActorRef, TestProbe}
+import play.api.libs.json.JsObject
+
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import JSON_PATH._
+
+
+class EnsembleSpec extends BaseAkkaSpec{
+
+  val ensembleJson = getJSValueFromString(Utils_JSONTest.simple_ensemble_test_json)
+  val orchestrationID = "ORCH-TEST-ENSEMBLE"
+  val parent = TestProbe("ENSEMBLE")
+  val monitor = TestProbe()
+
+  val ensembleRef = TestActorRef[Ensemble]( Props(new Ensemble(orchestrationID,"ORCH-NAME", ensembleJson.as[JsObject]){
+    override val monitoring_actor = monitor.ref
+  }), parent.ref, (ensembleJson \ JSON_PATH.GUID).as[String])
+
+  val ensembleState = ensembleRef.underlyingActor
+
+  var simplePerformerRef: ActorRef = _
+
+  s"Creating a simple Ensemble ${(ensembleJson \ JSON_PATH.GUID).as[String]}" should {
+    s"result in creation of Ensemble actor '${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}'" in {
+      TestProbe().expectActor(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}")
+    }
+    s"result in sending START to monitor actor" in {
+      monitor.expectMsgClass(classOf[Monitor.START])
+    }
+    s"result in creation of Performer 'TEST-0004'" in{
+      simplePerformerRef = TestProbe().expectActor(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}/TEST-0004")
+    }
+    s"result in Empty state variable Ensemble.connectors" in {
+      ensembleState.connectors shouldBe empty
+    }
+    s"result in one entry added to state variable Ensemble.performer" in{
+      ensembleState.performer should have size(1)
+      ensembleState.performer should contain key("TEST-0004")
+      ensembleState.performer.get("TEST-0004").get should equal(simplePerformerRef)
+    }
+    s"result in one right entry to state variable Ensemble.performers_metadata" in {
+      ensembleState.performers_metadata should have size(1)
+      val performers = (ensembleJson \ PERFORMERS).as[List[JsObject]]
+      val performerSpec = performers(0)
+      val performer = ensembleState.performers_metadata.get("TEST-0004").get
+
+      performer.controlAware should equal(false)
+      performer.jarName should equal((performerSpec \ SOURCE \ SOURCE_NAME).as[String])
+      performer.jarLocation should equal(CONFIG.JAR_REPOSITORY)
+      performer.autoScale should equal(0)
+      performer.backoff should equal((performerSpec \ BACKOFF).as[Int].millisecond)
+      performer.classPath should equal((performerSpec \ SOURCE \ SOURCE_CLASSPATH).as[String])
+      performer.uid should equal((performerSpec \ GUID).as[String])
+      performer.schedule should equal((performerSpec \ SCHEDULE).as[Int].millisecond)
+      performer.parameters shouldBe empty
+    }
+    "result in two paths added to IdentifyFeyActors.actorsPath" in{
+      globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+      Thread.sleep(500)
+      IdentifyFeyActors.actorsPath should have size(2)
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}/TEST-0004")
+    }
+  }
+
+  s"Sending Ensemble.STOP_PERFORMERS to Ensemble" should {
+    s"result in Terminate message of actor 'TEST-0004' and throw RestartEnsemble Exception" in {
+      EventFilter[RestartEnsemble](occurrences = 1) intercept {
+        ensembleRef ! Ensemble.STOP_PERFORMERS
+        TestProbe().verifyActorTermination(simplePerformerRef)
+      }
+    }
+    //s"result in Performer 'TEST-0004' restarted" in {
+    //  val newPerformer = TestProbe().expectActor(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}/TEST-0004")
+    //  newPerformer.compareTo(simplePerformerRef) should be(0)
+    //}
+    "result in two paths added to IdentifyFeyActors.actorsPath" in{
+      globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+      Thread.sleep(500)
+      IdentifyFeyActors.actorsPath should have size(2)
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}/TEST-0004")
+    }
+  }
+
+  s"Sending PoisonPill to Ensemble" should{
+    s"result in termination of actor '${(ensembleJson \ JSON_PATH.GUID).as[String]}'" in {
+      ensembleRef ! PoisonPill
+      TestProbe().verifyActorTermination(ensembleRef)
+    }
+    s"result in sending TERMINATE to monitor actor" in {
+      monitor.expectMsgClass(classOf[Monitor.TERMINATE])
+    }
+    "result in termination of ensemble and performer" in {
+      TestProbe().notExpectActor(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}")
+      TestProbe().notExpectActor(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}/TEST-0004")
+    }
+    "result in empty IdentifyFeyActors.actorsPath" in{
+      globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+      Thread.sleep(500)
+      IdentifyFeyActors.actorsPath shouldBe empty
+    }
+  }
+
+  val advEnsembleJson = getJSValueFromString(Utils_JSONTest.ensemble_test_json)
+  var advEnsembleRef:TestActorRef[Ensemble] = _
+  var advEnsembleState: Ensemble = ensembleRef.underlyingActor
+  var paramsRef: ActorRef = _
+  var scheduleRef: ActorRef = _
+  val generalScheduleTB = TestProbe("GENERAL-SCHEDULE")
+  val schedulerScheduleTB = TestProbe("SCHEDULER-SCHEDULE")
+  val generalParamsTB = TestProbe("GENERAL-PARAMS")
+  val processParamsTB = TestProbe("PROCESS-PARAMS")
+
+  s"creating more detailed Ensemble" should {
+    s"result in creation of Ensemble actor " in {
+      advEnsembleRef = TestActorRef[Ensemble]( Props(new Ensemble(orchestrationID,"ORCH-NAME", advEnsembleJson.as[JsObject]){
+        override val monitoring_actor = monitor.ref
+      }), parent.ref, (advEnsembleJson \ JSON_PATH.GUID).as[String])
+      advEnsembleState = advEnsembleRef.underlyingActor
+      TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}")
+    }
+    s"result in creation of Performer 'PERFORMER-SCHEDULER'" in{
+      scheduleRef = TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER")
+    }
+    s"result in creation of Performer 'PERFORMER-PARAMS'" in{
+      paramsRef = TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS")
+    }
+    s"create connection PERFORMER-SCHEDULER -> PERFORMER-PARAMS" in {
+      advEnsembleState.connectors should have size(1)
+      advEnsembleState.connectors should contain key("PERFORMER-SCHEDULER")
+      advEnsembleState.connectors.get("PERFORMER-SCHEDULER").get should equal(Array("PERFORMER-PARAMS"))
+    }
+    s"create 'PERFORMER-SCHEDULER' with schedule time equal to 200ms" in{
+      advEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.schedule should  equal(200.millisecond)
+    }
+    s"create 'PERFORMER-SCHEDULER' with connection to 'PERFORMER-PARAMS'" in{
+      scheduleRef ! ((schedulerScheduleTB.ref,system.deadLetters,generalScheduleTB.ref))
+      scheduleRef ! "GET_CONNECTIONS"
+      generalScheduleTB.expectMsg(Map("PERFORMER-PARAMS" -> paramsRef))
+    }
+    s"create 'PERFORMER-PARAMS' with no connections" in{
+      paramsRef ! ((system.deadLetters,processParamsTB.ref, generalParamsTB.ref))
+      paramsRef ! "GET_CONNECTIONS"
+      generalParamsTB.expectMsg(Map.empty)
+    }
+    s"create 'PERFORMER-PARAMS' with specified params" in{
+      val params = advEnsembleState.performers_metadata.get("PERFORMER-PARAMS").get.parameters
+      params should contain key("param-1")
+      params should contain key("param-2")
+      params.get("param-1").get should equal("test")
+      params.get("param-2").get should equal("test2")
+    }
+  }
+
+  s"'PERFORMER-SCHEDULER'" should{
+    "produce 5 messages in 1 seconds" in{
+      schedulerScheduleTB.expectMsg("EXECUTE")
+      Thread.sleep(100)
+      schedulerScheduleTB.receiveN(5, 1.seconds)
+    }
+    "produce 10 messages in 2 seconds" in{
+      schedulerScheduleTB.expectMsg("EXECUTE")
+      Thread.sleep(100)
+      schedulerScheduleTB.receiveN(10, 2.seconds)
+    }
+  }
+
+  s"'PERFORMER-PARAMS'" should{
+    "process 5 messages in 1 seconds" in{
+      schedulerScheduleTB.expectMsg("EXECUTE")
+      Thread.sleep(100)
+      processParamsTB.receiveN(5, 1.seconds)
+    }
+    "produce 10 messages in 2 seconds" in{
+      schedulerScheduleTB.expectMsg("EXECUTE")
+      Thread.sleep(100)
+      processParamsTB.receiveN(10, 2.seconds)
+    }
+  }
+
+  "Stopping any Performer that belongs to the Ensemble" should {
+    "force restart of entire Ensemble" in {
+      EventFilter[RestartEnsemble](occurrences = 1) intercept {
+        paramsRef ! PoisonPill
+        TestProbe().verifyActorTermination(paramsRef)
+      }
+    }
+    s"result in sending STOP - RESTART to monitor actor" in {
+      monitor.expectMsgClass(classOf[Monitor.STOP])
+      monitor.expectMsgClass(classOf[Monitor.RESTART])
+    }
+    //"keep actorRef when restarted" in {
+    //  TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}").compareTo(advEnsembleRef) should be(0)
+    //  TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER").compareTo(scheduleRef) should be(0)
+    //  TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS").compareTo(paramsRef) should be(0)
+    //}
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/b7dd7961/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
index 5be25c4..4df7671 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
@@ -267,4 +267,60 @@ object Utils_JSONTest {
        ]
      }"""
 
+  val simple_ensemble_test_json =
+    """
+     {
+       "guid":"MY-ENSEMBLE-0005",
+       "command": "UPDATE",
+       "performers":[
+         {
+           "guid": "TEST-0004",
+           "schedule": 0,
+           "backoff": 0,
+           "source": {
+             "name": "fey-test-actor.jar",
+             "classPath": "org.apache.iota.fey.TestActor",
+             "parameters": {}
+           }
+         }
+       ],
+       "connections":[]
+     }
+    """
+
+  val ensemble_test_json =
+    """
+     {
+       "guid":"MY-ENSEMBLE-0005",
+       "command": "NONE",
+       "performers":[
+         {
+           "guid": "PERFORMER-SCHEDULER",
+           "schedule": 200,
+           "backoff": 0,
+           "source": {
+             "name": "fey-test-actor.jar",
+             "classPath": "org.apache.iota.fey.TestActor",
+             "parameters": {}
+           }
+         },
+          {
+           "guid": "PERFORMER-PARAMS",
+           "schedule": 0,
+           "backoff": 0,
+           "source": {
+             "name": "fey-test-actor.jar",
+             "classPath": "org.apache.iota.fey.TestActor",
+             "parameters": {
+               "param-1" : "test",
+               "param-2" : "test2"
+             }
+           }
+         }
+       ],
+       "connections":[
+        {"PERFORMER-SCHEDULER":["PERFORMER-PARAMS"]}
+       ]
+     }
+    """
 }