You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2017/05/04 02:45:37 UTC
[1/2] incubator-iota git commit: [MIL5-175] - Avoiding concurrent
class load - using synchronized hash map
Repository: incubator-iota
Updated Branches:
refs/heads/master b84f86a64 -> cf172dc6e
[MIL5-175] - Avoiding concurrent class load - using synchronized hash map
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/3f80fdb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/3f80fdb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/3f80fdb9
Branch: refs/heads/master
Commit: 3f80fdb9d7baa72a722960a0d28e264ef1d1adb0
Parents: b84f86a
Author: Barbara Gomes <ba...@gmail.com>
Authored: Thu Apr 13 11:28:29 2017 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Thu Apr 13 11:28:29 2017 -0700
----------------------------------------------------------------------
.../scala/org/apache/iota/fey/Ensemble.scala | 34 ++++++++++++--------
.../main/scala/org/apache/iota/fey/Utils.scala | 6 +++-
2 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3f80fdb9/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 3f74cec..04308c3 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -218,24 +218,32 @@ protected class Ensemble(val orchestrationID: String,
* @return Props of actor based on JSON config
*/
private def getPerformer(performerInfo: Performer, connections: Map[String, ActorRef]): Props = {
+ var clazz:Option[Class[FeyGenericActor]] = None
- val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
+ Utils.loadedJars.synchronized {
+ clazz = Some(loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName))
+ }
- val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
+ if(clazz.isDefined) {
+ val dispatcher = if (performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
- val actorProps = Props(clazz,
- performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
+ val actorProps = Props(clazz.get,
+ performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
- // dispatcher has higher priority than controlAware. That means that if both are defined
- // then the custom dispatcher will be used
- if(dispatcher != ""){
- log.info(s"Using dispatcher: $dispatcher")
- actorProps.withDispatcher(dispatcher)
- }
- else if(performerInfo.controlAware){
- actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+ // dispatcher has higher priority than controlAware. That means that if both are defined
+ // then the custom dispatcher will be used
+ if (dispatcher != "") {
+ log.info(s"Using dispatcher: $dispatcher")
+ actorProps.withDispatcher(dispatcher)
+ }
+ else if (performerInfo.controlAware) {
+ actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+ } else {
+ actorProps
+ }
}else{
- actorProps
+ log.error(s"Could not load class for performer ${performerInfo.uid}")
+ throw new ClassNotFoundException(s"${performerInfo.jarName} -- ${performerInfo.jarLocation}")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3f80fdb9/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 5577ea6..3bfde54 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -42,6 +42,7 @@ protected object Utils {
* Keeps the loaded clazz in memory
* JARNAME,[CLASSPATH, CLASS]
*/
+
val loadedJars: HashMap[String, (URLClassLoader, Map[String, Class[FeyGenericActor]])]
= HashMap.empty[String, (URLClassLoader, Map[String, Class[FeyGenericActor]])]
@@ -72,9 +73,11 @@ protected object Utils {
loadedJars.get(jarName) match {
case None =>
+ log.info(s"Loading Jar: $jarName")
val urls:Array[URL] = Array(new URL(s"jar:file:$path!/"))
- val cl: URLClassLoader = URLClassLoader.newInstance(urls)
+ val cl: URLClassLoader = URLClassLoader.newInstance(urls, getClass.getClassLoader)
val clazz = cl.loadClass(className)
+ log.info(s"Loading Class $className with path $path")
val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]]
log.info(s"$path -> $className")
loadedJars.put(jarName, (cl, Map(className -> feyClazz)))
@@ -83,6 +86,7 @@ protected object Utils {
case Some(loadedJar) =>
loadedJar._2.get(className) match {
case None =>
+ log.info(s"Loading Class $className with path $path")
val clazz = loadedJar._1.loadClass(className)
val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]]
loadedJars.put(jarName, (loadedJar._1, Map(className -> feyClazz) ++ loadedJar._2))
[2/2] incubator-iota git commit: Adding shared performer: Shared
performer should be defined on fey configuration file and can be used by any
orchestration
Posted by to...@apache.org.
Adding shared performer: Shared performer should be defined on fey configuration file and can be used by any orchestration
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/cf172dc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/cf172dc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/cf172dc6
Branch: refs/heads/master
Commit: cf172dc6efb3e1e29cb6a68fa5a35a141b28cfee
Parents: 3f80fdb
Author: Barbara Gomes <ba...@gmail.com>
Authored: Tue Apr 25 13:56:02 2017 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Tue Apr 25 13:56:02 2017 -0700
----------------------------------------------------------------------
fey-core/src/main/resources/application.conf | 3 +
.../resources/shared-json-schema-validator.json | 97 +++++++++
.../apache/iota/fey/CoreSharedPerformers.scala | 205 +++++++++++++++++++
.../scala/org/apache/iota/fey/Ensemble.scala | 22 +-
.../scala/org/apache/iota/fey/FeyCore.scala | 35 +++-
.../org/apache/iota/fey/GlobalPerformer.scala | 35 ++--
.../main/scala/org/apache/iota/fey/Utils.scala | 2 +
7 files changed, 370 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/cf172dc6/fey-core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
index 1d7e092..435804d 100644
--- a/fey-core/src/main/resources/application.conf
+++ b/fey-core/src/main/resources/application.conf
@@ -96,6 +96,9 @@ fey-global-configuration{
// Which port and host should the restapi bind to
port = 16666
urlPath = "0.0.0.0"
+
+ //Path for shared performers json file
+ shared-performers = ""
}
// Fey akka configuration. Can not be overwritten by user
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/cf172dc6/fey-core/src/main/resources/shared-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/shared-json-schema-validator.json b/fey-core/src/main/resources/shared-json-schema-validator.json
new file mode 100644
index 0000000..2f5a376
--- /dev/null
+++ b/fey-core/src/main/resources/shared-json-schema-validator.json
@@ -0,0 +1,97 @@
+{
+ "shared-performers": {
+ "type": "array",
+ "items": {
+ "guid": {
+ "type": "string"
+ },
+ "controlAware": {
+ "type": "boolean"
+ },
+ "dispatcher": {
+ "type": "string"
+ },
+ "autoScale": {
+ "type": "object",
+ "lowerBound": {
+ "type": "integer",
+ "minimum": 1
+ },
+ "upperBound": {
+ "type": "integer",
+ "minimum": 1
+ },
+ "backoffThreshold": {
+ "type": "number",
+ "minimum": 0.0
+ },
+ "roundRobin": {
+ "type": "boolean"
+ },
+ "required": [
+ "lowerBound",
+ "upperBound"
+ ]
+ },
+ "schedule": {
+ "type": "integer",
+ "minimum": 0
+ },
+ "backoff": {
+ "type": "integer",
+ "minimum": 0
+ },
+ "source": {
+ "type": "object",
+ "name": {
+ "type": "string",
+ "pattern": "\\w+(\\.jar)"
+ },
+ "location": {
+ "type": "object",
+ "url": {
+ "type": "string",
+ "pattern": "(?i)(^(http|https|file)):\/\/"
+ },
+ "credentials": {
+ "user": {
+ "type": "string"
+ },
+ "password": {
+ "type": "string"
+ },
+ "required": [
+ "user",
+ "password"
+ ]
+ },
+ "required": [
+ "url"
+ ]
+ },
+ "classPath": {
+ "type": "string",
+ "pattern": "\\w+"
+ },
+ "parameters": {
+ "patternProperties": {
+ ".*": {
+ "type": "string"
+ }
+ }
+ },
+ "required": [
+ "name",
+ "classPath",
+ "parameters"
+ ]
+ },
+ "required": [
+ "guid",
+ "schedule",
+ "backoff",
+ "source"
+ ]
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/cf172dc6/fey-core/src/main/scala/org/apache/iota/fey/CoreSharedPerformers.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/CoreSharedPerformers.scala b/fey-core/src/main/scala/org/apache/iota/fey/CoreSharedPerformers.scala
new file mode 100644
index 0000000..57a3ceb
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/CoreSharedPerformers.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.SupervisorStrategy.Restart
+import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated}
+import akka.routing._
+import play.api.libs.json.JsObject
+
+import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
+
+protected class CoreSharedPerformers(val sharedJson: List[JsObject]) extends Actor with ActorLogging{
+
+ val monitoring_actor = FEY_MONITOR.actorRef
+ var shared_metadata: Map[String, Performer] = Map.empty[String, Performer]
+
+ override def receive: Receive = {
+
+ case CoreSharedPerformers.PRINT_GLOBAL =>
+ context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
+
+ case CoreSharedPerformers.RESTART_SHARED(uuid) =>
+ restartShared(uuid)
+
+ case Terminated(actor) =>
+ monitoring_actor ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
+ log.error(s"DEAD Shared performer: ${actor.path.name} . All the ensembles that uses this performer will be restarted")
+ self ! CoreSharedPerformers.RESTART_SHARED(actor.path.name)
+
+ case x => log.warning(s"Message $x not treated by Shared Performers")
+ }
+
+ /**
+ * If any of the shared performer dies, it tries to restart it.
+ * If we could not be restarted, then the terminated message will be received
+ * and Shared Performer is going to throw an Exception to its orchestration
+ * asking it to Restart all the entire orchestration. The restart will then stop all of its
+ * children when call the preStart.
+ */
+ override val supervisorStrategy =
+ OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+ case e: Exception => Restart
+ }
+
+ /**
+ * Uses the json spec to create the performers
+ */
+ override def preStart() : Unit = {
+ monitoring_actor ! Monitor.START(Utils.getTimestamp)
+
+ shared_metadata = Ensemble.extractPerformers(sharedJson)
+
+ createSharedPerformers()
+ }
+
+ override def postStop() : Unit = {
+ monitoring_actor ! Monitor.STOP(Utils.getTimestamp)
+ }
+
+ override def postRestart(reason: Throwable): Unit = {
+ monitoring_actor ! Monitor.RESTART(reason, Utils.getTimestamp)
+ preStart()
+ }
+
+ private def restartShared(uuid: String) = {
+ try {
+ log.warning(s"Restarting Shared Performer ${uuid}")
+ val metadata = shared_metadata.get(uuid)
+ if (metadata.isDefined) {
+ createFeyActor(uuid, metadata.get)
+ } else {
+ log.error(s"Could not restart shared $uuid because metadata is not configured")
+ }
+
+ CoreSharedPerformers.ensemblesUsingShared.keySet.foreach((ensAndShared) => {
+ if (ensAndShared._2 == uuid) {
+ val actor = CoreSharedPerformers.ensemblesUsingShared.get(ensAndShared).get
+ actor ! Ensemble.FORCE_RESTART_ENSEMBLE
+ }
+ })
+ }catch{
+ case e: Exception=>
+ log.error(e, s"Could not restart shared $uuid")
+ }
+ }
+
+ private def createSharedPerformers() = {
+ shared_metadata.foreach((shared) => {
+ try{
+ createFeyActor(shared._1, shared._2)
+ }catch{
+ case e: Exception =>
+ log.error(e, s"Could not created shared performer ${shared._1}")
+ }
+ })
+ }
+
+ private def createFeyActor(performerID: String, performerInfo: Performer) = {
+ val actor: ActorRef = {
+ val actorProps = getPerformer(performerInfo)
+ if (performerInfo.autoScale) {
+
+ val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
+ messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
+
+ val strategy =
+ if (performerInfo.isRoundRobin) {
+ log.info(s"Using Round Robin for performer ${performerID}")
+ RoundRobinPool(1, Some(resizer))
+ } else {
+ log.info(s"Using Smallest mailbox for performer ${performerID}")
+ SmallestMailboxPool(1, Some(resizer))
+ }
+
+ context.actorOf(strategy.props(actorProps), name = performerID)
+ } else {
+ context.actorOf(actorProps, name = performerID)
+ }
+ }
+
+ context.watch(actor)
+ CoreSharedPerformers.activeSharedPerformers.put(performerID, actor)
+ }
+
+ /**
+ * Creates actor props based on JSON configuration
+ *
+ * @param performerInfo Performer object
+ * @return Props of actor based on JSON config
+ */
+ private def getPerformer(performerInfo: Performer): Props = {
+
+ var clazz:Option[Class[FeyGenericActor]] = None
+
+ Utils.loadedJars.synchronized {
+ clazz = Some(loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName))
+ }
+
+ if(clazz.isDefined) {
+ val dispatcher = if (performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
+
+ val actorProps = Props(clazz.get, performerInfo.parameters, performerInfo.backoff,
+ Map.empty, performerInfo.schedule, self.path.name, self.path.name, performerInfo.autoScale)
+
+ // dispatcher has higher priority than controlAware. That means that if both are defined
+ // then the custom dispatcher will be used
+ if (dispatcher != "") {
+ log.info(s"Using dispatcher: $dispatcher")
+ actorProps.withDispatcher(dispatcher)
+ }
+ else if (performerInfo.controlAware) {
+ actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+ } else {
+ actorProps
+ }
+ }else{
+ log.error(s"Could not load class for performer ${performerInfo.uid}")
+ throw new ClassNotFoundException(s"${performerInfo.jarName} -- ${performerInfo.jarLocation}")
+ }
+ }
+
+ /**
+ * Load a clazz instance of FeyGenericActor from a jar
+ *
+ * @param classPath class path
+ * @param jarLocation Full path where to load the jar from
+ * @return clazz instance of FeyGenericActor
+ */
+ private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor] = {
+ try {
+ Utils.loadActorClassFromJar(jarLocation,classPath,jarName)
+ }catch {
+ case e: Exception =>
+ log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check the Jar repository path as well the jar name")
+ throw e
+ }
+ }
+}
+
+object CoreSharedPerformers{
+
+ // [sharedID, actorRef]
+ val activeSharedPerformers:HashMap[String,ActorRef] = HashMap.empty[String,ActorRef]
+ // [[ensembleID, sharedID] -> actorRef]
+ val ensemblesUsingShared:HashMap[Tuple2[String,String], ActorRef] = HashMap.empty[Tuple2[String,String], ActorRef]
+
+ case class RESTART_SHARED(shared_id: String)
+ case object PRINT_GLOBAL
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/cf172dc6/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 04308c3..e2944f0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -41,6 +41,8 @@ protected class Ensemble(val orchestrationID: String,
case STOP_PERFORMERS => stopPerformers()
+ case FORCE_RESTART_ENSEMBLE => throw new RestartEnsemble(s"Forcing restart of ensemble. Reason: Shared performer DEAD")
+
case PRINT_ENSEMBLE =>
val ed = connectors.map(connector => {
s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
@@ -198,14 +200,17 @@ protected class Ensemble(val orchestrationID: String,
context.watch(actor)
tmpActors.put(performerID, actor)
(performerID, actor)
- }else{
- // Performer is a global performer and is already created
- if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID)
- && GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){
- (performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get)
- }else {
- throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON")
- }
+ }else if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID)
+ && GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){
+ // Performer is a global orchestration performer and is already created
+ (performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get)
+ }else if(CoreSharedPerformers.activeSharedPerformers.contains(performerID)){
+ // Shared performers, level is FeyCore
+ CoreSharedPerformers.ensemblesUsingShared.put((self.path.toString, performerID), self)
+ (performerID, CoreSharedPerformers.activeSharedPerformers.get(performerID).get)
+ }
+ else {
+ throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON")
}
}
}
@@ -354,6 +359,7 @@ object Ensemble {
*/
case object STOP_PERFORMERS
case object PRINT_ENSEMBLE
+ case object FORCE_RESTART_ENSEMBLE
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/cf172dc6/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 14c4cc0..24a6d93 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -31,6 +31,7 @@ import play.api.libs.json._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
+import scala.io.Source
protected class FeyCore extends Actor with ActorLogging{
@@ -50,6 +51,7 @@ protected class FeyCore extends Actor with ActorLogging{
case START =>
val jsonReceiverActor: ActorRef = context.actorOf(Props[JsonReceiverActor], name = JSON_RECEIVER_NAME)
context.watch(jsonReceiverActor)
+ startSharedPerformers()
case ORCHESTRATION_RECEIVED(orchestrationJson, optionFile) =>
optionFile match {
@@ -70,13 +72,34 @@ protected class FeyCore extends Actor with ActorLogging{
case Terminated(actor) => processTerminatedMessage(actor)
- case GetRoutees => //Discard
-
case x =>
log.info(s"Received $x")
}
+ private def startSharedPerformers() = {
+ var jsonList:List[JsObject] = List.empty
+ if(CONFIG.SHARED_PERFORMER_JSON_PATH.isDefined){
+
+ //reading file
+ val sharedString = Source.fromFile(CONFIG.SHARED_PERFORMER_JSON_PATH.get).getLines().mkString("")
+ val sharedJson = Json.parse(sharedString)
+
+ val result = SchemaValidator.validate(FeyCore.sharedJsonSpec, sharedJson)
+ if (result.isError) {
+ log.error("Incorrect SHARED JSON schema \n" + result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
+ val path = (error \ "instancePath").as[String]
+ val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
+ s"$path \n\tErrors: $msg"
+ }).mkString("\n"))
+ } else {
+ jsonList = (sharedJson \ "shared-performers").as[List[JsObject]]
+ }
+ }
+ val sharedMain: ActorRef = context.actorOf(Props(classOf[CoreSharedPerformers], jsonList), name = SHARED_PERFORMERS_NAME)
+ context.watch(sharedMain)
+ }
+
private def orchestrationReceivedNoFile(json: JsValue) = {
val orchGUID = (json \ GUID).as[String]
log.info(s"Orchestration $orchGUID received")
@@ -355,13 +378,11 @@ protected object FeyCore{
final val JSON_RECEIVER_NAME: String = "JSON_RECEIVER"
final val IDENTIFIER_NAME: String = "FEY_IDENTIFIER"
+ final val SHARED_PERFORMERS_NAME: String = "FEY_SHARED_PERFORMERS"
- /**
- * Loads the specification for validating a Fey JSON
- */
- val jsonSchemaSpec: SchemaType = {
+ val sharedJsonSpec: SchemaType = {
Json.fromJson[SchemaType](Json.parse(scala.io.Source
- .fromInputStream(getClass.getResourceAsStream("/fey-json-schema-validator.json"))
+ .fromInputStream(getClass.getResourceAsStream("/shared-json-schema-validator.json"))
.getLines()
.mkString(""))).get
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/cf172dc6/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
index e343f2e..26a58c4 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
@@ -32,7 +32,6 @@ protected class GlobalPerformer(val orchestrationID: String,
val monitoring_actor = FEY_MONITOR.actorRef
var global_metadata: Map[String, Performer] = Map.empty[String, Performer]
- var global_performer: Map[String,ActorRef] = Map.empty[String,ActorRef]
override def receive: Receive = {
@@ -138,24 +137,32 @@ protected class GlobalPerformer(val orchestrationID: String,
* @return Props of actor based on JSON config
*/
private def getPerformer(performerInfo: Performer): Props = {
+ var clazz:Option[Class[FeyGenericActor]] = None
- val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
+ Utils.loadedJars.synchronized {
+ clazz = Some(loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName))
+ }
- val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
+ if(clazz.isDefined) {
+ val dispatcher = if (performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
- val actorProps = Props(clazz,
- performerInfo.parameters, performerInfo.backoff, Map.empty, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
+ val actorProps = Props(clazz.get,
+ performerInfo.parameters, performerInfo.backoff, Map.empty, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
- // dispatcher has higher priority than controlAware. That means that if both are defined
- // then the custom dispatcher will be used
- if(dispatcher != ""){
- log.info(s"Using dispatcher: $dispatcher")
- actorProps.withDispatcher(dispatcher)
- }
- else if(performerInfo.controlAware){
- actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+ // dispatcher has higher priority than controlAware. That means that if both are defined
+ // then the custom dispatcher will be used
+ if (dispatcher != "") {
+ log.info(s"Using dispatcher: $dispatcher")
+ actorProps.withDispatcher(dispatcher)
+ }
+ else if (performerInfo.controlAware) {
+ actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+ } else {
+ actorProps
+ }
}else{
- actorProps
+ log.error(s"Could not load class for performer ${performerInfo.uid}")
+ throw new ClassNotFoundException(s"${performerInfo.jarName} -- ${performerInfo.jarLocation}")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/cf172dc6/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 3bfde54..bedec84 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -238,6 +238,7 @@ object CONFIG{
var MONITORING_TYPE: String = "COMPLETE"
var PORT = DEFAULT_PORT
var URL_PATH = "localhost"
+ var SHARED_PERFORMER_JSON_PATH: Option[String] = None
def loadUserConfiguration(path: String) : Unit = {
val app = {
@@ -265,6 +266,7 @@ object CONFIG{
MONITORING_TYPE = app.getString("monitoring.type").toUpperCase()
PORT = app.getInt("port")
URL_PATH = app.getString("urlPath")
+ SHARED_PERFORMER_JSON_PATH = if(app.getString("shared-performers").isEmpty) None else Some(app.getString("shared-performers"))
setLogbackConfiguration()
}