You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2017/01/31 22:29:10 UTC
[07/12] incubator-iota git commit: [IOTA-36] - First version of
global performers. In this version,
global performers can not connect to any other performer.
[IOTA-36] - First version of global performers. In this version, global performers can not connect to any other performer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/580e3010
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/580e3010
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/580e3010
Branch: refs/heads/master
Commit: 580e30103d72d2448426a120eb6a39b544661e68
Parents: 7aae205
Author: Barbara Gomes <ba...@gmail.com>
Authored: Mon Jan 30 16:03:39 2017 -0800
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Mon Jan 30 16:03:39 2017 -0800
----------------------------------------------------------------------
.../resources/fey-json-schema-validator.json | 95 ++++++++++
.../scala/org/apache/iota/fey/Ensemble.scala | 25 ++-
.../scala/org/apache/iota/fey/FeyCore.scala | 34 ++--
.../org/apache/iota/fey/GlobalPerformer.scala | 186 +++++++++++++++++++
.../org/apache/iota/fey/Orchestration.scala | 53 +++++-
.../main/scala/org/apache/iota/fey/Utils.scala | 3 +
6 files changed, 372 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 79952dd..a86c09f 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -13,6 +13,101 @@
"name":{
"type":"string"
},
+ "global-performers":{
+ "type":"array",
+ "items": {
+ "guid": {
+ "type": "string"
+ },
+ "controlAware": {
+ "type": "boolean"
+ },
+ "dispatcher": {
+ "type": "string"
+ },
+ "autoScale": {
+ "type": "object",
+ "lowerBound": {
+ "type": "integer",
+ "minimum": 1
+ },
+ "upperBound": {
+ "type": "integer",
+ "minimum": 1
+ },
+ "backoffThreshold": {
+ "type": "number",
+ "minimum": 0.0
+ },
+ "roundRobin": {
+ "type": "boolean"
+ },
+ "required": [
+ "lowerBound",
+ "upperBound"
+ ]
+ },
+ "schedule": {
+ "type": "integer",
+ "minimum": 0
+ },
+ "backoff": {
+ "type": "integer",
+ "minimum": 0
+ },
+ "source": {
+ "type": "object",
+ "name": {
+ "type": "string",
+ "pattern": "\\w+(\\.jar)"
+ },
+ "location": {
+ "type": "object",
+ "url": {
+ "type": "string",
+ "pattern": "(?i)(^(http|https|file)):\/\/"
+ },
+ "credentials": {
+ "user": {
+ "type": "string"
+ },
+ "password": {
+ "type": "string"
+ },
+ "required": [
+ "user",
+ "password"
+ ]
+ },
+ "required": [
+ "url"
+ ]
+ },
+ "classPath": {
+ "type": "string",
+ "pattern": "\\w+"
+ },
+ "parameters": {
+ "patternProperties": {
+ ".*": {
+ "type": "string"
+ }
+ }
+ },
+ "required": [
+ "name",
+ "classPath",
+ "parameters"
+ ]
+ },
+ "required":[
+ "guid",
+ "schedule",
+ "backoff",
+ "source"
+ ]
+ }
+ },
"ensembles":{
"type":"array",
"items":{
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 4db5f98..15b6ead 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -162,7 +162,16 @@ protected class Ensemble(val orchestrationID: String,
* @return (performerID, ActorRef of the performer)
*/
private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String, ActorRef]):(String, ActorRef) = {
- if(!tmpActors.contains(performerID)){
+ // Performer is a global performer and is already created
+ if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID)
+ && GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){
+ (performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get)
+ }
+ // performer was already created
+ else if(tmpActors.contains(performerID)){
+ (performerID, tmpActors.get(performerID).get)
+ }
+ else{
val performerInfo = performers_metadata.get(performerID)
if (performerInfo.isDefined) {
val connections: Map[String, ActorRef] = connectionIDs.map(connID => {
@@ -178,12 +187,12 @@ protected class Ensemble(val orchestrationID: String,
val strategy =
if(performerInfo.get.isRoundRobin) {
- log.info(s"Using Round Robin for performer ${performerID}")
- RoundRobinPool(1, Some(resizer))
- } else {
- log.info(s"Using Smallest mailbox for performer ${performerID}")
- SmallestMailboxPool(1, Some(resizer))
- }
+ log.info(s"Using Round Robin for performer ${performerID}")
+ RoundRobinPool(1, Some(resizer))
+ } else {
+ log.info(s"Using Smallest mailbox for performer ${performerID}")
+ SmallestMailboxPool(1, Some(resizer))
+ }
actor = context.actorOf(strategy.props(actorProps), name = performerID)
@@ -197,8 +206,6 @@ protected class Ensemble(val orchestrationID: String,
}else{
throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON")
}
- }else{
- (performerID, tmpActors.get(performerID).get)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 9c2b61d..99dc7c4 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -24,7 +24,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill,
import akka.routing.GetRoutees
import com.eclipsesource.schema._
import org.apache.iota.fey.JSON_PATH._
-import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
+import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
import org.apache.iota.fey.Utils._
import play.api.libs.json._
@@ -101,6 +101,7 @@ protected class FeyCore extends Actor with ActorLogging{
log.info(s"TERMINATED ${actorRef.path.name}")
FEY_CACHE.activeOrchestrations.remove(actorRef.path.name)
ORCHESTRATION_CACHE.orchestration_metadata.remove(actorRef.path.name)
+ ORCHESTRATION_CACHE.orchestration_globals.remove(actorRef.path.name)
if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
checkForOrchestrationWaitingForTermination(actorRef.path.name)
}
@@ -155,10 +156,12 @@ protected class FeyCore extends Actor with ActorLogging{
val orchestrationCommand = (orchestrationJSON \ COMMAND).as[String].toUpperCase()
val orchestrationTimestamp = (orchestrationJSON \ ORCHESTRATION_TIMESTAMP).as[String]
val ensembles = (orchestrationJSON \ ENSEMBLES).as[List[JsObject]]
+ val globalPerformers = (orchestrationJSON \ GLOBAL_PERFORMERS).asOpt[List[JsObject]]
+
orchestrationCommand match {
- case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
- case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
- case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+ case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp, globalPerformers)
+ case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp, globalPerformers)
+ case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp, globalPerformers)
case "DELETE" => deleteOrchestration(orchestrationID,true)
case x => throw new CommandNotRecognized(s"Command: $x")
}
@@ -177,13 +180,13 @@ protected class FeyCore extends Actor with ActorLogging{
* @return
*/
private def recreateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
- orchestrationName: String, orchestrationTimestamp: String) = {
+ orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]]) = {
FEY_CACHE.activeOrchestrations.get(orchestrationID) match {
case Some(orchestration) =>
try{
// If timestamp is greater than the last timestamp
if(orchestration._1 != orchestrationTimestamp){
- val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp)
+ val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp, globalPerformers)
FEY_CACHE.orchestrationsAwaitingTermination.put(orchestrationID, orchestrationInfo)
deleteOrchestration(orchestrationID, true)
}else{
@@ -192,7 +195,7 @@ protected class FeyCore extends Actor with ActorLogging{
}catch{
case e: Exception =>
}
- case None => createOrchestration(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp)
+ case None => createOrchestration(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp, globalPerformers)
}
}
@@ -206,7 +209,7 @@ protected class FeyCore extends Actor with ActorLogging{
case Some(orchestrationAwaiting) =>
FEY_CACHE.orchestrationsAwaitingTermination.remove(terminatedOrchestrationName)
createOrchestration(orchestrationAwaiting.ensembleSpecJson, orchestrationAwaiting.orchestrationID,
- orchestrationAwaiting.orchestrationName, orchestrationAwaiting.orchestrationTimestamp)
+ orchestrationAwaiting.orchestrationName, orchestrationAwaiting.orchestrationTimestamp, orchestrationAwaiting.globalPerformers)
case None =>
}
}
@@ -222,7 +225,7 @@ protected class FeyCore extends Actor with ActorLogging{
* @param orchestrationTimestamp
*/
private def createOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
- orchestrationName: String, orchestrationTimestamp: String) = {
+ orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]]) = {
try{
if(!FEY_CACHE.activeOrchestrations.contains(orchestrationID)) {
val orchestration = context.actorOf(
@@ -230,7 +233,13 @@ protected class FeyCore extends Actor with ActorLogging{
name = orchestrationID)
FEY_CACHE.activeOrchestrations.put(orchestrationID, (orchestrationTimestamp, orchestration))
context.watch(orchestration)
- orchestration ! CREATE_ENSEMBLES(ensemblesSpecJson)
+
+ if(globalPerformers.isDefined && globalPerformers.get.size > 0){
+ orchestration ! CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalPerformers.get, ensemblesSpecJson)
+ }else {
+ orchestration ! CREATE_ENSEMBLES(ensemblesSpecJson)
+ }
+
}else{
log.error(s"Orchestration $orchestrationID is already defined in the network.")
}
@@ -270,8 +279,9 @@ protected class FeyCore extends Actor with ActorLogging{
}
}
+ // TODO: Check out how to manage global performers for updating
private def updateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
- orchestrationName: String, orchestrationTimestamp: String) = {
+ orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]]) = {
FEY_CACHE.activeOrchestrations.get(orchestrationID) match {
case None => log.warning(s"Orchestration not update. No active Orchestration $orchestrationID.")
case Some(orchestration) => {
@@ -368,4 +378,4 @@ private object FEY_CACHE{
}
sealed case class OrchestrationInformation(ensembleSpecJson: List[JsObject], orchestrationID: String,
- orchestrationName: String, orchestrationTimestamp: String)
+ orchestrationName: String, orchestrationTimestamp: String, globalPerformers:Option[List[JsObject]])
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
new file mode 100644
index 0000000..e343f2e
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.SupervisorStrategy.Restart
+import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated}
+import akka.routing._
+import play.api.libs.json.JsObject
+
+import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
+
+protected class GlobalPerformer(val orchestrationID: String,
+ val orchestrationName: String,
+ val globalPerformers: List[JsObject],
+ val ensemblesSpec : List[JsObject]) extends Actor with ActorLogging{
+
+ val monitoring_actor = FEY_MONITOR.actorRef
+ var global_metadata: Map[String, Performer] = Map.empty[String, Performer]
+ var global_performer: Map[String,ActorRef] = Map.empty[String,ActorRef]
+
+ override def receive: Receive = {
+
+ case GlobalPerformer.PRINT_GLOBAL =>
+ context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
+
+ case Terminated(actor) =>
+ monitoring_actor ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
+ log.error(s"DEAD Global Performers ${actor.path.name}")
+ context.children.foreach{ child =>
+ context.unwatch(child)
+ context.stop(child)
+ }
+ throw new RestartGlobalPerformers(s"DEAD Global Performer ${actor.path.name}")
+
+ case GetRoutees => //Discard
+
+ case x => log.warning(s"Message $x not treated by Global Performers")
+ }
+
+ /**
+ * If any of the global performer dies, it tries to restart it.
+ * If we could not be restarted, then the terminated message will be received
+ * and Global Performer is going to throw an Exception to its orchestration
+ * asking it to Restart all the entire orchestration. The restart will then stop all of its
+ * children when call the preStart.
+ */
+ override val supervisorStrategy =
+ OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+ case e: Exception => Restart
+ }
+
+ /**
+ * Uses the json spec to create the performers
+ */
+ override def preStart() : Unit = {
+
+ monitoring_actor ! Monitor.START(Utils.getTimestamp)
+
+ global_metadata = Ensemble.extractPerformers(globalPerformers)
+
+ createGlobalPerformers()
+
+ }
+
+ override def postStop() : Unit = {
+ monitoring_actor ! Monitor.STOP(Utils.getTimestamp)
+ }
+
+ override def postRestart(reason: Throwable): Unit = {
+ monitoring_actor ! Monitor.RESTART(reason, Utils.getTimestamp)
+ preStart()
+ }
+
+ private def createGlobalPerformers() = {
+ try {
+ global_metadata.foreach((global_performer) => {
+ createFeyActor(global_performer._1, global_performer._2)
+ })
+ context.parent ! Orchestration.CREATE_ENSEMBLES(ensemblesSpec)
+ } catch {
+ /* if the creation fails, it will stop the orchestration */
+ case e: Exception =>
+ log.error(e,"During Global Manager creation")
+ throw new RestartGlobalPerformers("Could not create global performer")
+ }
+ }
+
+ private def createFeyActor(performerID: String, performerInfo: Performer) = {
+ val actor: ActorRef = {
+ val actorProps = getPerformer(performerInfo)
+ if (performerInfo.autoScale) {
+
+ val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
+ messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
+
+ val strategy =
+ if (performerInfo.isRoundRobin) {
+ log.info(s"Using Round Robin for performer ${performerID}")
+ RoundRobinPool(1, Some(resizer))
+ } else {
+ log.info(s"Using Smallest mailbox for performer ${performerID}")
+ SmallestMailboxPool(1, Some(resizer))
+ }
+
+ context.actorOf(strategy.props(actorProps), name = performerID)
+ } else {
+ context.actorOf(actorProps, name = performerID)
+ }
+ }
+
+ context.watch(actor)
+ GlobalPerformer.activeGlobalPerformers.get(orchestrationID) match {
+ case Some(globals) => GlobalPerformer.activeGlobalPerformers.put(orchestrationID, (globals ++ Map(performerID -> actor)))
+ case None => GlobalPerformer.activeGlobalPerformers.put(orchestrationID, Map(performerID -> actor))
+ }
+ }
+
+ /**
+ * Creates actor props based on JSON configuration
+ *
+ * @param performerInfo Performer object
+ * @return Props of actor based on JSON config
+ */
+ private def getPerformer(performerInfo: Performer): Props = {
+
+ val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
+
+ val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
+
+ val actorProps = Props(clazz,
+ performerInfo.parameters, performerInfo.backoff, Map.empty, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
+
+ // dispatcher has higher priority than controlAware. That means that if both are defined
+ // then the custom dispatcher will be used
+ if(dispatcher != ""){
+ log.info(s"Using dispatcher: $dispatcher")
+ actorProps.withDispatcher(dispatcher)
+ }
+ else if(performerInfo.controlAware){
+ actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+ }else{
+ actorProps
+ }
+ }
+
+ /**
+ * Load a clazz instance of FeyGenericActor from a jar
+ *
+ * @param classPath class path
+ * @param jarLocation Full path where to load the jar from
+ * @return clazz instance of FeyGenericActor
+ */
+ private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor] = {
+ try {
+ Utils.loadActorClassFromJar(jarLocation,classPath,jarName)
+ }catch {
+ case e: Exception =>
+ log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check the Jar repository path as well the jar name")
+ throw e
+ }
+ }
+
+}
+
+object GlobalPerformer{
+
+ val activeGlobalPerformers:HashMap[String, Map[String, ActorRef]] = HashMap.empty[String, Map[String, ActorRef]]
+
+ case object PRINT_GLOBAL
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
index 1e47a80..0b9152b 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
@@ -44,6 +44,7 @@ protected class Orchestration(val name: String,
case CREATE_ENSEMBLES(ensemblesJsonSpec) => createEnsembles(ensemblesJsonSpec)
case DELETE_ENSEMBLES(ensemblesJsonSpec) => deleteEnsembles(ensemblesJsonSpec)
case UPDATE_ENSEMBLES(ensemblesJsonSpec) => updateEnsembles(ensemblesJsonSpec)
+ case CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalSpec, ensembleSpec) => createGlobalsAndEnsembles(globalSpec, ensembleSpec)
case PRINT_PATH =>
log.info(s"** ${self.path} **")
@@ -64,6 +65,7 @@ protected class Orchestration(val name: String,
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+ case e: RestartGlobalPerformers => akka.actor.SupervisorStrategy.Escalate
case e: RestartEnsemble => Restart
case e: Exception => Restart
}
@@ -103,9 +105,18 @@ protected class Orchestration(val name: String,
*/
private def replayOrchestrationState() = {
val ensemblesSpec = ORCHESTRATION_CACHE.orchestration_metadata.get(guid).get.map(_._2).toList
- ORCHESTRATION_CACHE.orchestration_metadata.remove(guid)
- ORCHESTRATION_CACHE.orchestration_name.remove(guid)
- self ! CREATE_ENSEMBLES(ensemblesSpec)
+ val global = ORCHESTRATION_CACHE.orchestration_globals.get(guid)
+ if(global.isDefined){
+ val globalSpec = global.get.map(_._2).toList
+ ORCHESTRATION_CACHE.orchestration_metadata.remove(guid)
+ ORCHESTRATION_CACHE.orchestration_name.remove(guid)
+ ORCHESTRATION_CACHE.orchestration_globals.remove(guid)
+ self ! CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalSpec, ensemblesSpec)
+ }else{
+ ORCHESTRATION_CACHE.orchestration_metadata.remove(guid)
+ ORCHESTRATION_CACHE.orchestration_name.remove(guid)
+ self ! CREATE_ENSEMBLES(ensemblesSpec)
+ }
}
/**
@@ -168,6 +179,40 @@ protected class Orchestration(val name: String,
* @param ensemblesJsonSpec
* @return
*/
+ private def createGlobalsAndEnsembles(globalSpec: List[JsObject], ensemblesJsonSpec: List[JsObject]) = {
+ log.info(s"Creating global performers: ${globalSpec}")
+ try{
+ // Actor will send message to orchestration to create ensembles once global performers are created
+ val global_manager = context.actorOf(Props(classOf[GlobalPerformer], guid, name, globalSpec, ensemblesJsonSpec), name = "GLOBAL_MANAGER")
+ context.watch(global_manager)
+ }catch{
+ case e: Exception =>
+ log.error(s"Could not create Global Performers manager actor for orchestration $guid")
+ throw new RestartOrchestration(s"Could not create global actors")
+ }
+
+ //Fill orchestration_globals
+ ORCHESTRATION_CACHE.orchestration_globals.get(guid) match {
+ case None =>
+ ORCHESTRATION_CACHE.orchestration_globals.put(guid, (globalSpec.map(global => {
+ val guid = (global \ GUID).as[String]
+ (guid, global)
+ }).toMap))
+ case Some(cachedGlobals) =>
+ ORCHESTRATION_CACHE.orchestration_metadata.put(guid, cachedGlobals ++ (globalSpec.map(global => {
+ val guid = (global \ GUID).as[String]
+ (guid, global)
+ }).toMap))
+ }
+ }
+
+ /**
+ * Creates Ensembles from the json specification and make it
+ * a member of the orchestration Ensembles
+ *
+ * @param ensemblesJsonSpec
+ * @return
+ */
private def createEnsembles(ensemblesJsonSpec: List[JsObject]) = {
log.info(s"Creating Ensembles: ${ensemblesJsonSpec}")
val newEnsembles = ensemblesJsonSpec.map(ensembleSpec => {
@@ -236,6 +281,7 @@ protected object Orchestration{
case class CREATE_ENSEMBLES(ensemblesJsonSpec: List[JsObject])
case class DELETE_ENSEMBLES(ensemblesJsonSpec: List[JsObject])
case class UPDATE_ENSEMBLES(ensemblesJsonSpec: List[JsObject])
+ case class CREATE_GLOBAL_PERFORMERS_AND_ENSEMBLES(globalPerformersSpec: List[JsObject], ensemblesJsonSpec: List[JsObject])
case object PRINT_PATH
}
@@ -249,5 +295,6 @@ protected object ORCHESTRATION_CACHE{
* Value = Map[Ensemble GUID, JsObject of the ensemble]
*/
val orchestration_metadata: HashMap[String, Map[String,JsObject]] = HashMap.empty[String, Map[String,JsObject]]
+ val orchestration_globals : HashMap[String, Map[String,JsObject]] = HashMap.empty[String, Map[String,JsObject]]
val orchestration_name: HashMap[String, String] = HashMap.empty
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/580e3010/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index fda1a30..3bf0eb8 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -201,6 +201,7 @@ object JSON_PATH{
val JAR_CRED_USER = "user"
val JAR_CRED_PASSWORD = "password"
val PERFORMER_DISPATCHER = "dispatcher"
+ val GLOBAL_PERFORMERS = "global-performers"
}
object CONFIG{
@@ -329,3 +330,5 @@ case class IllegalPerformerCreation(message:String) extends Exception(message)
case class NetworkNotDefined(message:String) extends Exception(message)
case class CommandNotRecognized(message:String) extends Exception(message)
case class RestartEnsemble(message:String) extends Exception(message)
+case class RestartGlobalPerformers(message: String) extends Exception(message)
+case class RestartOrchestration(message: String) extends Exception(message)
\ No newline at end of file