You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/09/28 01:19:16 UTC

[06/10] incubator-iota git commit: New Feature: User should be able to define a custom dispatcher for the performer

New Feature: User should be able to define a custom dispatcher for the performer

- Added "dispatcher" property to Performer JSON (Optional)
- Added "custom-dispatcher" configuration to Fey


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/2135e87e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/2135e87e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/2135e87e

Branch: refs/heads/master
Commit: 2135e87e0527c61eaafaba956d5b3b7c156d233c
Parents: fedb5e2
Author: Barbara Gomes <ba...@gmail.com>
Authored: Fri Jul 29 11:40:59 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Fri Jul 29 11:40:59 2016 -0700

----------------------------------------------------------------------
 fey-core/src/main/resources/application.conf    |  8 ++++++++
 .../resources/fey-json-schema-validator.json    |  3 +++
 .../scala/org/apache/iota/fey/Application.scala |  2 +-
 .../scala/org/apache/iota/fey/Ensemble.scala    | 20 ++++++++++++++++----
 .../main/scala/org/apache/iota/fey/Utils.scala  | 10 +++++++++-
 5 files changed, 37 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
index 3816151..a1193af 100644
--- a/fey-core/src/main/resources/application.conf
+++ b/fey-core/src/main/resources/application.conf
@@ -75,6 +75,14 @@ fey-global-configuration{
     messages-per-resize = 500
   }
 
+  // Holds all the Akka dispatchers configured by the user.
+  // These dispatchers will be given to Fey ActorSystem and will be available
+  // to be used by the performers. To do so, just specify the property "dispatcher"
+  // on the performer json using the path of the dispatcher inside the custom-dispatcher
+  custom-dispatchers{
+    // No default custom implementations
+  }
+
 }
 
 // Fey akka configuration. Can not be overwritten by user

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 5dbf58a..579fbca 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -32,6 +32,9 @@
           "controlAware":{
             "type":"boolean"
           },
+          "dispatcher":{
+            "type": "string"
+          },
           "autoScale":{
             "type":"integer",
             "minimum":0

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index 637fbf0..d62c418 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -35,7 +35,7 @@ object Application extends App {
 }
 
 object FEY_SYSTEM{
-  implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM")
+  implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM", CONFIG.getDispatcherForAkka().withFallback(ConfigFactory.load()))
 }
 
 object SYSTEM_ACTORS{

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index cc7640c..afa39f2 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -205,16 +205,22 @@ protected class Ensemble(val orchestrationID: String,
     val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
 
     val autoScale = if(performerInfo.autoScale > 0) true else false
+    val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
 
     val actorProps = Props(clazz,
       performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, autoScale)
 
-    if(performerInfo.controlAware){
+    // dispatcher has higher priority than controlAware. That means that if both are defined
+    // then the custom dispatcher will be used
+    if(dispatcher != ""){
+      log.info(s"Using dispatcher: $dispatcher")
+      actorProps.withDispatcher(dispatcher)
+    }
+    else if(performerInfo.controlAware){
       actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
     }else{
       actorProps
     }
-
   }
 
   /**
@@ -282,7 +288,9 @@ object Ensemble {
       val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
       val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \ CONTROL_AWARE).as[Boolean] else false
       val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
-      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware, location))
+      val dispatcher: String = if (performer.keys.contains(PERFORMER_DISPATCHER)) (performer \ PERFORMER_DISPATCHER).as[String] else ""
+
+      (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond, autoScale,controlAware, location, dispatcher))
     }).toMap
   }
 
@@ -314,8 +322,12 @@ object Ensemble {
   * @param parameters performer params
   * @param schedule performer schedule interval
   * @param backoff performer backoff interval
+  * @param autoScale if actor was started as a router and can autoscala
+  * @param controlAware if the actor uses a controlAware mailbox
+  * @param jarLocation download jar
+  * @param dispatcher Akka dispatcher that the actor is using
   */
 case class Performer(uid: String, jarName: String,
                 classPath: String, parameters: Map[String,String],
                 schedule: FiniteDuration, backoff: FiniteDuration,
-                autoScale: Int, controlAware: Boolean, jarLocation: String)
+                autoScale: Int, controlAware: Boolean, jarLocation: String, dispatcher: String)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index bb0f4d5..8a0f36d 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -25,7 +25,7 @@ import ch.qos.logback.classic.{Level, Logger, LoggerContext}
 import ch.qos.logback.core.joran.spi.JoranException
 import ch.qos.logback.core.util.StatusPrinter
 import com.eclipsesource.schema.SchemaType
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory, ConfigValue}
 import org.slf4j.LoggerFactory
 import play.api.libs.json.{JsObject, JsValue, Json}
 
@@ -195,6 +195,7 @@ object JSON_PATH{
   val JAR_CREDENTIALS_URL = "credentials"
   val JAR_CRED_USER = "user"
   val JAR_CRED_PASSWORD = "password"
+  val PERFORMER_DISPATCHER = "dispatcher"
 }
 
 object CONFIG{
@@ -215,6 +216,7 @@ object CONFIG{
   var MESSAGES_PER_RESIZE:Int = 500
   var DYNAMIC_JAR_REPO = ""
   var DYNAMIC_JAR_FORCE_PULL = false
+  var CUSTOM_DISPATCHERS: ConfigValue = null
 
   def loadUserConfiguration(path: String) : Unit = {
 
@@ -238,10 +240,16 @@ object CONFIG{
       MESSAGES_PER_RESIZE = app.getInt("auto-scale.messages-per-resize")
       DYNAMIC_JAR_REPO = app.getString("dynamic-jar-population.downloaded-repository")
       DYNAMIC_JAR_FORCE_PULL = app.getBoolean("dynamic-jar-population.force-pull")
+      CUSTOM_DISPATCHERS = app.getValue("custom-dispatchers")
 
     setLogbackConfiguration()
   }
 
+  def getDispatcherForAkka():Config = {
+    val config = ConfigFactory.parseString("")
+    config.withValue("fey-custom-dispatchers", CUSTOM_DISPATCHERS)
+  }
+
   /**
     * Resets logback context configuration and loads the new one
     */