You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/09/28 01:19:16 UTC
[06/10] incubator-iota git commit: New Feature: User should be able
to define a custom dispatcher for the performer
New Feature: User should be able to define a custom dispatcher for the performer
- Added "dispatcher" property to Performer JSON (Optional)
- Added "custom-dispatcher" configuration to Fey
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/2135e87e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/2135e87e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/2135e87e
Branch: refs/heads/master
Commit: 2135e87e0527c61eaafaba956d5b3b7c156d233c
Parents: fedb5e2
Author: Barbara Gomes <ba...@gmail.com>
Authored: Fri Jul 29 11:40:59 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Fri Jul 29 11:40:59 2016 -0700
----------------------------------------------------------------------
fey-core/src/main/resources/application.conf | 8 ++++++++
.../resources/fey-json-schema-validator.json | 3 +++
.../scala/org/apache/iota/fey/Application.scala | 2 +-
.../scala/org/apache/iota/fey/Ensemble.scala | 20 ++++++++++++++++----
.../main/scala/org/apache/iota/fey/Utils.scala | 10 +++++++++-
5 files changed, 37 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
index 3816151..a1193af 100644
--- a/fey-core/src/main/resources/application.conf
+++ b/fey-core/src/main/resources/application.conf
@@ -75,6 +75,14 @@ fey-global-configuration{
messages-per-resize = 500
}
+ // Holds all the Akka dispatchers configured by the user.
+ // These dispatchers will be given to Fey ActorSystem and will be available
+ // to be used by the performers. To do so, just specify the property "dispatcher"
+ // on the performer json using the path of the dispatcher inside the custom-dispatcher
+ custom-dispatchers{
+ // No default custom implementations
+ }
+
}
// Fey akka configuration. Can not be overwritten by user
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 5dbf58a..579fbca 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -32,6 +32,9 @@
"controlAware":{
"type":"boolean"
},
+ "dispatcher":{
+ "type": "string"
+ },
"autoScale":{
"type":"integer",
"minimum":0
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index 637fbf0..d62c418 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -35,7 +35,7 @@ object Application extends App {
}
object FEY_SYSTEM{
- implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM")
+ implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM", CONFIG.getDispatcherForAkka().withFallback(ConfigFactory.load()))
}
object SYSTEM_ACTORS{
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index cc7640c..afa39f2 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -205,16 +205,22 @@ protected class Ensemble(val orchestrationID: String,
val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
val autoScale = if(performerInfo.autoScale > 0) true else false
+ val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
val actorProps = Props(clazz,
performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, autoScale)
- if(performerInfo.controlAware){
+ // dispatcher has higher priority than controlAware. That means that if both are defined
+ // then the custom dispatcher will be used
+ if(dispatcher != ""){
+ log.info(s"Using dispatcher: $dispatcher")
+ actorProps.withDispatcher(dispatcher)
+ }
+ else if(performerInfo.controlAware){
actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
}else{
actorProps
}
-
}
/**
@@ -282,7 +288,9 @@ object Ensemble {
val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \ CONTROL_AWARE).as[Boolean] else false
val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
- (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware, location))
+ val dispatcher: String = if (performer.keys.contains(PERFORMER_DISPATCHER)) (performer \ PERFORMER_DISPATCHER).as[String] else ""
+
+ (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond, autoScale,controlAware, location, dispatcher))
}).toMap
}
@@ -314,8 +322,12 @@ object Ensemble {
* @param parameters performer params
* @param schedule performer schedule interval
* @param backoff performer backoff interval
+ * @param autoScale if actor was started as a router and can autoscala
+ * @param controlAware if the actor uses a controlAware mailbox
+ * @param jarLocation download jar
+ * @param dispatcher Akka dispatcher that the actor is using
*/
case class Performer(uid: String, jarName: String,
classPath: String, parameters: Map[String,String],
schedule: FiniteDuration, backoff: FiniteDuration,
- autoScale: Int, controlAware: Boolean, jarLocation: String)
+ autoScale: Int, controlAware: Boolean, jarLocation: String, dispatcher: String)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/2135e87e/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index bb0f4d5..8a0f36d 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -25,7 +25,7 @@ import ch.qos.logback.classic.{Level, Logger, LoggerContext}
import ch.qos.logback.core.joran.spi.JoranException
import ch.qos.logback.core.util.StatusPrinter
import com.eclipsesource.schema.SchemaType
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory, ConfigValue}
import org.slf4j.LoggerFactory
import play.api.libs.json.{JsObject, JsValue, Json}
@@ -195,6 +195,7 @@ object JSON_PATH{
val JAR_CREDENTIALS_URL = "credentials"
val JAR_CRED_USER = "user"
val JAR_CRED_PASSWORD = "password"
+ val PERFORMER_DISPATCHER = "dispatcher"
}
object CONFIG{
@@ -215,6 +216,7 @@ object CONFIG{
var MESSAGES_PER_RESIZE:Int = 500
var DYNAMIC_JAR_REPO = ""
var DYNAMIC_JAR_FORCE_PULL = false
+ var CUSTOM_DISPATCHERS: ConfigValue = null
def loadUserConfiguration(path: String) : Unit = {
@@ -238,10 +240,16 @@ object CONFIG{
MESSAGES_PER_RESIZE = app.getInt("auto-scale.messages-per-resize")
DYNAMIC_JAR_REPO = app.getString("dynamic-jar-population.downloaded-repository")
DYNAMIC_JAR_FORCE_PULL = app.getBoolean("dynamic-jar-population.force-pull")
+ CUSTOM_DISPATCHERS = app.getValue("custom-dispatchers")
setLogbackConfiguration()
}
+ def getDispatcherForAkka():Config = {
+ val config = ConfigFactory.parseString("")
+ config.withValue("fey-custom-dispatchers", CUSTOM_DISPATCHERS)
+ }
+
/**
* Resets logback context configuration and loads the new one
*/