You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/08 01:05:30 UTC

[1/2] incubator-iota git commit: [IOTA-24] Offer control-aware mailbox to Performers

Repository: incubator-iota
Updated Branches:
  refs/heads/master df7428188 -> 40f6adb25


[IOTA-24] Offer control-aware mailbox to Performers

Adding JSON configuration so the user can ask Fey to start the Performer using a control-aware mailbox


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/9126d531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/9126d531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/9126d531

Branch: refs/heads/master
Commit: 9126d5318c5b862e9b0cefd9d84c101706e2ad01
Parents: df74281
Author: Barbara Gomes <ba...@gmail.com>
Authored: Wed Jul 6 14:43:01 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Wed Jul 6 14:43:01 2016 -0700

----------------------------------------------------------------------
 fey-core/src/main/resources/application.conf    |  6 +++
 .../resources/fey-json-schema-validator.json    |  3 ++
 .../scala/org/apache/iota/fey/Ensemble.scala    | 41 +++++++++++++++-----
 .../main/scala/org/apache/iota/fey/Utils.scala  |  2 +
 4 files changed, 42 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
index 28992f4..26fa9a9 100644
--- a/fey-core/src/main/resources/application.conf
+++ b/fey-core/src/main/resources/application.conf
@@ -63,4 +63,10 @@ akka {
   loggers = ["akka.event.slf4j.Slf4jLogger"]
   loglevel = "DEBUG"
   logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+
+  fey-dispatchers {
+    control-aware-dispatcher {
+      mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 83dca5e..8f12b24 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -29,6 +29,9 @@
           "guid":{
             "type":"string"
           },
+          "controlAware":{
+            "type":"boolean"
+          },
           "autoScale":{
             "type":"integer",
             "minimum":0

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 87fbe7f..73d1b59 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -158,20 +158,17 @@ protected class Ensemble(val orchestrationID: String,
           createFeyActor(connID, connectors.getOrElse(connID,Array.empty),tmpActors)
         }).toMap
 
-        val clazz = loadClazzFromJar(performerInfo.classPath, performerInfo.jarName)
 
         var actor:ActorRef = null
+        val actorProps = getPerformer(performerInfo, connections)
         if(performerInfo.autoScale > 0) {
           val resizer = DefaultResizer(lowerBound = 1, upperBound = performerInfo.autoScale, messagesPerResize = 200, backoffThreshold = 0.4)
           val smallestMailBox = SmallestMailboxPool(1, Some(resizer))
-          actor = context.actorOf(
-            smallestMailBox.props(Props(clazz,
-              performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, true)),
-            name = performerID)
+
+          actor = context.actorOf(smallestMailBox.props(actorProps), name = performerID)
+
         }else{
-          actor = context.actorOf(Props(clazz, performerInfo.parameters,
-            performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, false),
-            name = performerID)
+          actor = context.actorOf(actorProps, name = performerID)
         }
 
         context.watch(actor)
@@ -186,6 +183,29 @@ protected class Ensemble(val orchestrationID: String,
   }
 
   /**
+    * Creates actor props based on JSON configuration
+    * @param performerInfo Performer object
+    * @param connections connections
+    * @return Props of actor based on JSON config
+    */
+  private def getPerformer(performerInfo: Performer, connections: Map[String, ActorRef]): Props = {
+
+    val clazz = loadClazzFromJar(performerInfo.classPath, performerInfo.jarName)
+
+    val autoScale = if(performerInfo.autoScale > 0) true else false
+
+    val actorProps = Props(clazz,
+      performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, autoScale)
+
+    if(performerInfo.controlAware){
+      actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+    }else{
+      actorProps
+    }
+
+  }
+
+  /**
     * Load a clazz instance of FeyGenericActor from a jar
     *
     * @param classPath class path
@@ -248,7 +268,8 @@ object Ensemble {
       val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
       val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
       val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
-      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale))
+      val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \ CONTROL_AWARE).as[Boolean] else false
+      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware))
     }).toMap
   }
 
@@ -284,4 +305,4 @@ object Ensemble {
 case class Performer(uid: String, jarName: String,
                 classPath: String, parameters: Map[String,String],
                 schedule: FiniteDuration, backoff: FiniteDuration,
-                autoScale: Int)
+                autoScale: Int, controlAware: Boolean)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/9126d531/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index dab47bd..bdc420c 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -175,6 +175,7 @@ object JSON_PATH{
   val ORCHESTRATION_NAME = "name"
   val ORCHESTRATION_TIMESTAMP = "timestamp"
   val PERFORMER_AUTO_SCALE = "autoScale"
+  val CONTROL_AWARE = "controlAware"
 }
 
 object CONFIG{
@@ -183,6 +184,7 @@ object CONFIG{
 
   val FILE_APPENDER = "FEY-FILE"
   val CONSOLE_APPENDER = "FEY-CONSOLE"
+  val CONTROL_AWARE_MAILBOX = "akka.fey-dispatchers.control-aware-dispatcher"
 
   var CHECKPOINT_DIR = ""
   var JSON_REPOSITORY = ""


[2/2] incubator-iota git commit: [IOTA-24] Offer control-aware mailbox to Performers

Posted by to...@apache.org.
[IOTA-24] Offer control-aware mailbox to Performers

Updating README.md to reflect the new configuration


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/40f6adb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/40f6adb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/40f6adb2

Branch: refs/heads/master
Commit: 40f6adb253701ee910067d45559e1f283b7e5309
Parents: 9126d53
Author: Barbara Gomes <ba...@gmail.com>
Authored: Wed Jul 6 15:08:03 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Wed Jul 6 15:08:03 2016 -0700

----------------------------------------------------------------------
 fey-core/README.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/40f6adb2/fey-core/README.md
----------------------------------------------------------------------
diff --git a/fey-core/README.md b/fey-core/README.md
index 2be8565..d61ab0d 100644
--- a/fey-core/README.md
+++ b/fey-core/README.md
@@ -152,7 +152,8 @@ For Fey, each Performer represents a Generic Actor which should have the followi
 | Property                    | Type                 | Description   |
 | :---------------------- | :------------------- | :------------ |
 | **guid** | String | Must be a unique ID inside the Ensemble |
-| **loadBalance** | Integer | `Optional` property. Tells if the actor should be a load balanced actor. If zero or not specified, the actor will be started without the load balancing property. If greater than zero, the actor will be started using load balancing and the max number of replicated actors is the specified number. It means that if the value is 10, then the actor will be a load balanced actor and it can scale up to 10 replicas. |
+| **controlAware** | Boolean | `Optional` property. Tells if the actor should use a Control aware Mailbox, so the Control messages have higher priority over the others. If not specified then the actor will use the Default mailbox.
+| **autoScale** | Integer | `Optional` property. Tells if the actor should be a load balanced actor. If zero or not specified, the actor will be started without the load balancing property. If greater than zero, the actor will be started using load balancing and the max number of replicated actors is the specified number. It means that if the value is 10, then the actor will be a load balanced actor and it can scale up to 10 replicas. |
 | **schedule** | Integer | Defines the time interval in `Milliseconds` for the actor [scheduler](#markdown-header-scheduler). If zero, no scheduler will be started. |
 | **backoff** | Integer | Defines the time window in `Milliseconds` that the actor should backoff after receiving a PROCESS message. (See [Handling Backoff](#markdown-header-handling-backoff) for more details.) |
 | **source** | Object | Defines the needed information used by Fey to load the GenericActor. See [Source](#markdown-header-source-property) for details.