You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/01 21:55:37 UTC

[2/2] incubator-iota git commit: Implementing first version of Fey - [IOTA-2] [IOTA-3] [IOTA-4] [IOTA-5] [IOTA-6] [IOTA-7] [IOTA-8] [IOTA-9] [IOTA-10] [IOTA-11] [IOTA-12] [IOTA-13] [IOTA-14] [IOTA-15]

Implementing first version of Fey - [IOTA-2] [IOTA-3] [IOTA-4] [IOTA-5] [IOTA-6] [IOTA-7] [IOTA-8] [IOTA-9] [IOTA-10] [IOTA-11] [IOTA-12] [IOTA-13] [IOTA-14] [IOTA-15]

Signed-off-by: Tony Faustini <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/10042354
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/10042354
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/10042354

Branch: refs/heads/master
Commit: 10042354c912edd529c7ab7bcf479c2850451174
Parents: 407f0d5
Author: Barbara Gomes <ba...@gmail.com>
Authored: Fri Jul 1 14:52:17 2016 -0700
Committer: Tony Faustini <to...@apache.org>
Committed: Fri Jul 1 14:55:17 2016 -0700

----------------------------------------------------------------------
 fey-core/README.md                              | 426 +++++++++++++++++++
 fey-core/images/Fey-Actor-Tree.png              | Bin 0 -> 201734 bytes
 fey-core/src/main/resources/application.conf    |  52 +++
 fey-core/src/main/resources/d3Tree.html         | 198 +++++++++
 .../resources/fey-json-schema-validator.json    | 102 +++++
 fey-core/src/main/resources/logback.xml         | 103 +++++
 .../scala/org/apache/iota/fey/Application.scala |  42 ++
 .../scala/org/apache/iota/fey/DirWatcher.scala  | 103 +++++
 .../scala/org/apache/iota/fey/Ensemble.scala    | 287 +++++++++++++
 .../scala/org/apache/iota/fey/FeyCore.scala     | 417 ++++++++++++++++++
 .../org/apache/iota/fey/FeyGenericActor.scala   | 222 ++++++++++
 .../org/apache/iota/fey/IdentifyFeyActors.scala |  90 ++++
 .../scala/org/apache/iota/fey/MyService.scala   |  59 +++
 .../org/apache/iota/fey/Orchestration.scala     | 242 +++++++++++
 .../scala/org/apache/iota/fey/TrieNode.scala    |  65 +++
 .../main/scala/org/apache/iota/fey/Utils.scala  | 267 ++++++++++++
 16 files changed, 2675 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/README.md
----------------------------------------------------------------------
diff --git a/fey-core/README.md b/fey-core/README.md
new file mode 100644
index 0000000..ce019c8
--- /dev/null
+++ b/fey-core/README.md
@@ -0,0 +1,426 @@
+# Fey Engine
+
+Fey is an [Akka](http://akka.io/) based framework that facilitates the definition of Fey actors each actor implementing a Fey component. 
+Fey actors extend a generic Fey Actor _FeyGenericActor_ and override generic functions, like `onStart` and `execute` , to define an autonomous computation. 
+Each Fey actor should be provided through a _.jar_ which the Fey engine loads to access the actors functionality.
+
+The Actors defined by a set of _.jar_ files are referenced via a JSON configuration file that we call an **Orchestration**. 
+The Orchestration is used to define which actors to use, what building properties of the actor to set, what unique parameters to pass into each actor, and what message passing relationship 
+actors have with each other. In essence an Orchestration defines what will be computed by the Fey engine.
+The _.jar_ files and the Orchestration files are stored in locations that the Fey Engine is told of , through its configuration, when it starts up. The Fey engine manages
+the lifecycle of the actors, called **Performers**, defined by the Orchestrations. This includes the creation, execution, deletion, scalability and fault-tolerance of the component actors. 
+An Orchestrations defines the message passing relationship that exists between actors as well as any time based scheduler to be implemented by each actor. 
+Each Orchestration defines a computation to be executed by the Fey engine. Note each actor can specify, in the Orchestration file, that it wants Fey to automatically scale instances of an actor upwards or downwards 
+based on the actors incoming message load. 
+
+In summary the Fey engine manages Orchestrations of asynchronous networks of continuously operating actors that interact with each other through message passing. 
+If you have a pre-defined set of _.jar_ files then programming the Fey engine is simply a matter of defining Orchestrations via JSON file. 
+Orchestration files can be added, updated and deleted and the Fey engine will manage the Orchestration life cycle. 
+
+The Fey engine can be used at different levels of compute infrastructure. For example it can run on low cost devices such as a _RaspberryPi_, on a standalone server or in a Mesos cluster. 
+Remote invocation of actors make it possible for Orchestrations running in one location to interact with Orchestrations running in another where services not available locally can be accessed.
+
+## Prerequisites
+
+1. Java **SDK** (Software Development Kit) 1.8+
+2. Scala 2.11
+3. **SBT** 0.13.11 (<http://www.scala-sbt.org/>) 
+
+## Architecture
+
+Fey is composed by Akka actors. Following is the description of each one of the actors:
+
+1. **Fey-Core**: Main actor of the system. It is the highest ancestor of all other actors and is created right after Fey started.
+2. **Directory-Watcher**: Actor responsible for monitoring the JSON directory and notifying Fey-Core when a new file event happens.
+3. **Fey-Identifier**: Actor responsible for sending the `Identity` message to each one of the active actors. _Routees_ are not affected by this actor.
+4. **Orchestration-ID**: A new instance of Orchestration is created every time we process a JSON with a new Orchestration GUID. It is responsible for managing the Ensembles.
+5. **Ensemble-ID**: A new instance of Ensemble is created under the correspondent Orchestration every time the JSON specify a new Ensemble GUID.
+6. **Performer-ID**: A new instance is created for each Performer inside the Ensemble. Each Performer belongs to an Ensemble and also has a GUID.
+
+The figure bellow shows the actor hierarchy for Fey. In this example, Fey has only two Orchestration running, in which each Orchestration has two Ensemble, and each Ensemble has its own Performers.
+
+![Fey actor hierarchy](./images/Fey-Actor-Tree.png)
+
+Each actor path follows the following pattern:
+
+* **Orchestration** - `/FEY-CORE/ORCHESTRATION-GUID/`
+* **Ensemble** - `/FEY-CORE/ORCHESTRATION-GUID/ENSEMBLE-GUID/`
+* **Performer** - `/FEY-CORE/ORCHESTRATION-GUID/ENSEMBLE-GUID/PERFORMER-GUID`
+
+`Orchestrations watch its Ensembles and Ensembles watch its Performers`
+
+When started Fey will process all the JSON files sitting on the checkpoint directory (If enabled) and then all JSON files sitting on the JSON repository directory and then it starts monitoring for new file events.
+
+### Active Actors
+ 
+Fey offers a REST-API end point that displays all active actors. 
+This end-point only shows 4 levels of the hierarchy, which means that it shows only until the Perfomers level. 
+It also uses the _Fey-Identifier_ actor that sends an `Identity` message to the actors. Having that in mind, some actors may take longer to answer back with its identity, what can make the actor not appear on the JSON response.
+
+The REST-API binds to `localhost` listening to the port `16666`, and the end point should be called like:
+ 
+ ``` http://localhost:16666/fey/activeactors ```
+ 
+ Bellow is a sample JSON response:
+ 
+```json
+{ 
+     "total": 6,
+     "name": "FeyCore",
+     "children": [
+         {
+             "name": "MYORCHESTRATION01",
+             "children": [
+                 {
+                     "name": "ENSEMBLE01",
+                     "children": [
+                       {
+                          "name": "ZMQ",
+                          "children": []
+                       },
+                       {
+                          "name": "REDIS",
+                          "children": []
+                       }
+                     ]
+                 }
+             ]
+         },
+         {
+             "name": "DIR_WATCHER",
+             "children": []
+         },
+         {
+             "name": "FEY_IDENTIFIER",
+             "children": []
+         }
+     ]
+ }
+```
+
+
+## Running Fey
+
+You can specify one argument in order to run Fey:
+
+1. **Fey config file**: Path to the configuration file to be used by Fey. It is a _optional_ argument, if it is not defined then the default configuration will be used.
+
+You just need to execute Fey _.jar_:
+	
+```java -jar fey.jar FEY_CONFIG_FILE```
+	
+Fey will process all the JSON files sitting on the directory and then start monitoring for file events.
+**Obs**: If checkpoint is enabled, then all files sitting on the checkpoint directory will be processed before the ones sitting on the JSON directory.
+
+### Fey Configuration
+
+The Fey configuration file should contain any of the following properties:
+
+| Property                    | Type                 | Description   | Default|
+| :---------------------- | :------------------- | :------------ | :------------ |
+| **enable-checkpoint** | Boolean | Keeps track of the latest Orchestrations running so in case of Fey stops, it will restart back from the checkpoint. When it is enables fey will add `.processed` to all processed files and start all Orchestration sitting on the checkpoint dir. If it is disabled then no checkpoint will be created. | true
+| **checkpoint-directory** | String | Path where Fey should keep the checkpoint files | /tmp/fey/checkpoint |
+| **json-repository** | String | Path that will be monitored by Fey in order to get the JSON files | ~/feyJSONRepo |
+| **json-extension** | String | Extension of the files that should be processed by Fey | .json |
+| **jar-repository** | String | Path where Fey should look for the GenericActor jars to be loaded from | ~/feyJarRepo |
+| **log-level** | String | Log level for Fey | DEBUG|
+| **log-appender**| String | Enables or disable the appender based on the user configuration. Accepts 3 options: FILE or STDOUT or FILE_STDOUT | STDOUT |
+
+### Fey Logging
+
+Fey uses _logback.xml_ to configure its logs. By Default, Fey saves its logs to a log file located at `${HOME}/.fey/logs/`.
+Fey uses a Rolling File Appender where each log file has a max size of one megabyte (1MB) and it keeps 30 log files at maximum.
+
+Right now the log level is `INFO` for the entire system, and the only configuration offer by Fey is the log level of Akka Actors.
+
+## JSON Configuration
+
+Fey gets its instructions to start the generic actors through a well defined JSON schema. For Fey, each JSON will specify an Orchestration, in which will be defined the Ensembles with its Performers.
+
+The Orchestration specification is defined at the root of the JSON, and requires the following JSON properties:
+
+| Property                    | Type                 | Description   |
+| :---------------------- | :------------------- | :------------ |
+| **guid** | String | Global unique Orchestration identifier. If this property changes it value, it will be considered a new Orchestration by Fey.|
+| **timestamp** | String | Orchestration timestamp. Holds the timestamp for the last action in the Orchestration.|
+| **name** |String | Orchestration name.|
+| **command** | String| See details in [Orchestration Commands](#markdown-header-orchestration-commands). | 
+| **ensembles** | Array[Object] | Holds all the Ensembles that this Orchestration will be running. See [Ensembles](#markdown-header-ensembles) for more details| 
+
+### Ensemble
+An Orchestration can have one or more Ensembles. Each Ensemble must define the following properties:
+
+| Property                    | Type                 | Description   |
+| :---------------------- | :------------------- | :------------ |
+| **guid** | String | Must be unique inside the Orchestration |
+| **command** | String | See details in [Ensembles Commands](#markdown-header-ensemble-commands). |
+| **performers** | Array[Object] | See details in [Ensemble Performers](#markdown-header-ensemble-performers). |
+| **connections** | Array[Object] | See details in [Ensemble Connections](#markdown-header-ensemble-connections). |
+
+###Orchestration Commands
+Fey drives the Orchestration based on a set of 4 commands:
+
+1. `CREATE`: Fey will check if there is an Orchestration with the same _guid_ running. If there isn't, Fey will create the Orchestration, the Ensembles and its Performers, and start them. If there is already an Orchestration running, Fey will log an WARN, and nothing will be created or updated. 
+2. `UPDATE`: Fey will check if there is already an Orchestration with the same _guid_ running. If there is, Fey will check the command for each one of the Ensembles and execute the respective action. If there isn't, Fey will log an WARN, and nothing will happen. Please, see [Ensemble Commands](#markdown-header-ensemble-commands) for a list of available commands.
+3. `DELETE`: If there is an Orchestration with the same _guid_ running, Fey will stop all of the actors and delete the Orchestration.
+4. `RECREATE`: The recreate commands does not care if the Orchestration exists or not, it will always try to delete the Orchestration and then create a new one based on the JSON specification.
+
+###Ensemble Commands
+
+If the Orchestration command is `UPDATE`, Fey will check if the Orchestration is running and then for each one of the Ensembles it will check the specific command:
+
+1. `CREATE`: Creates a new Ensemble if there isn't one running yet.
+2. `UPDATE`: Deletes the Ensemble and starts a new one using the new configuration.
+3. `DELETE`: Deletes the Ensemble.
+4. `NONE`: The Ensemble will not change.
+
+### Ensemble Performers
+
+For Fey, each Performer represents a Generic Actor which should have the following properties:
+
+| Property                    | Type                 | Description   |
+| :---------------------- | :------------------- | :------------ |
+| **guid** | String | Must be a unique ID inside the Ensemble |
+| **loadBalance** | Integer | `Optional` property. Tells if the actor should be a load balanced actor. If zero or not specified, the actor will be started without the load balancing property. If greater than zero, the actor will be started using load balancing and the max number of replicated actors is the specified number. It means that if the value is 10, then the actor will be a load balanced actor and it can scale up to 10 replicas. |
+| **schedule** | Integer | Defines the time interval in `Milliseconds` for the actor [scheduler](#markdown-header-scheduler). If zero, no scheduler will be started. |
+| **backoff** | Integer | Defines the time window in `Milliseconds` that the actor should backoff after receiving a PROCESS message. (See [Handling Backoff](#markdown-header-handling-backoff) for more details.) |
+| **source** | Object | Defines the needed information used by Fey to load the GenericActor. See [Source](#markdown-header-source-property) for details.
+
+#### Source Property
+
+The source property of an Performer holds the necessary information for loading the actor from a _.jar_. Each Performer has only one source property and it should contains the following information:
+
+| Property                    | Type                 | Description   |
+| :---------------------- | :------------------- | :------------ |
+| **name** | String | Jar name that contains the Generic Actor. This jar must be present in the specified [jar repo](#markdown-header-running-Fey). The jar name is not case sensitive. |
+| **classPath** | String | class path for the GenericActor class inside the _.jar_. It should include the package as well.|
+| **parameters** | Object | Contains any additional information that will be used by the GenericActor. It will be passed to the actor as a `HashMap[String,String]` in which the key is the property name, and the value is the property value. It can contain as many properties as you want to.|
+
+### Ensemble Connections
+The Connections property of an Ensemble defines the connection between the Performers. See [connectTo](#markdown-header-constructor) constructor parameter for more details about how this information is used.
+
+An object inside the Connections property obeys the following pattern:
+
+1. Property name: Performer GUID that will be connect to the property values.
+2. Property value: Array of Performer IDs that will be connected to the property name.
+
+`The Performer IDs must be defined at the Performer property of the Ensemble in order to be used inside the Connections`
+
+### Sample JSON
+
+This JSON specifies an Orchestration that has only one Ensemble. The Ensemble defines the Redis and ZMQ _.jar_ and then map the Redis to ZMQ.
+
+```json
+{  
+  "guid":"MYORCHESTRATION01",
+  "command":"CREATE",
+  "timestamp":"213263914535",
+  "name":"ORCHESTRATION SAMPLE JSON CONFIG",
+  "ensembles":[  
+    {  
+      "guid":"ENSEMBLE01",
+      "command":"NONE",
+      "performers":[  
+        {  
+          "guid":"REDIS",
+          "schedule":1000,
+          "backoff":0,
+          "source":{  
+            "name":"fey-redis.jar",
+            "classPath":"com.fey.RedisConnector",
+            "parameters":{  
+              "server":"localhost",
+              "keys":"{\"keys\":[\"key1\",\"key2\"]}",
+              "port":"1234"
+            }
+          }
+        },
+        {  
+          "guid":"ZMQ",
+          "schedule":0,
+          "loadBalance":3,
+          "backoff":0,
+          "source":{  
+            "name":"fey-zmq.jar",
+            "classPath":"com.fey.ZMQPublisher",
+            "parameters":{  
+              "server":"localhost",
+              "topic":"mytopic",
+              "port":"1235"
+            }
+          }
+        }
+      ],
+      "connections":[  
+        {  
+          "REDIS":[  
+            "ZMQ"
+          ]
+        }
+      ]
+    }
+  ]
+}
+```
+## Fey Checkpoint
+
+Fey will keep track of the latest version of each Orchestration running. This can be disable through the configuration file.
+When checkpoint is enabled, Fey will process the files in the JSON directory and add to them an extension: 
+
+1. `.processed`: Means that the file had the correct JSON schema and was able to be processed by Fey.
+2. `.failed`: Means that something was wrong in the JSON and it could not be parsed by Fey.
+
+All running Orchestration will be stored in order to be used in the future when Fey system restarts, which means that Fey will load all Orchestrations sitting on the checkpoint directory prior to the ones sitting on the JSON directory.
+
+If checkpoint is not enabled no extension will be added to the JSON files and no checkpoint will be kept.
+
+## Writing Fey Generic Actor
+
+All actors (called **Performers** in Fey) loaded by Fey `must be` an extension of the `FeyGenericActor` abstract class. In order to create your own _Fey Generic Actor_ you will need to add fey as a `provided` library to your project and create a new class that extends from _FeyGenericActor_.
+
+###Constructor
+
+The Generic Actor offers the following constructor parameters:
+
+| Name                    | Type                 | Description   |
+| :---------------------- | :------------------- | :------------ |
+| params                  | Map[String,String]   | Holds all the extra configuration that is going to be used by the actor. For example: ("port" -> "1234") |
+| backoff                 | FiniteDuration       | Time interval to backoff after the processing of the message PROCESS. Will always be greater or equal to zero |
+| connectTo               | Map[String,ActorRef] | Holds all the actors that your actor is suppose to propagate a message to. Maps the Actor's ID to the ActorRef
+| schedulerTimeInterval   | FiniteDuration       | Time interval for the scheduler. If it is zero, then no scheduler will be started for the actor.
+| orchestrationName             | String               | Name of the Orchestration in which the actor belongs to 
+| orchestrationID               | String               | Id of the Orchestration in which the actor belongs to
+| autoScale | Boolean | True means that this actor will be replicated in order to obtain scalability, so you should be ware that the `onStart` will be called for each replica.
+
+When starting a new **Performer**, Fey will give to the actor all the configuration specified in the Orchestration's JSON.
+
+`The GenericActor must override all the constructor parameters in order and can not define any extras one.`
+
+The reason for these restrictions is that Fey will load your actor from the _.jar_ and you generate the actor reference passing in the list of constructor parameters in order. In case these restrictions are not obeyed, Fey will throw an _IllegalArgumentException_ exception during the creation of the actor because it could not find a matching constructor on the class.
+
+```scala
+class MyGenericActor(override val params: Map[String, String] = Map.empty,
+               override val backoff: FiniteDuration = 1.minutes,
+               override val connectTo: Map[String, ActorRef] = Map.empty,
+               override val schedulerTimeInterval: FiniteDuration = 30.seconds,
+               override val orchestrationName: String = "",
+               override val orchestrationID: String = "",
+               override val autoScale: Boolean = false) extends FeyGenericActor {}
+```
+
+### Life-cycle Actor Hooks
+
+Fey's generic actor final overrides the life-cycle actor hooks, such as: `preStart`, `postStop`, `preRestart`, `postRestart`. But, it does offers to the user the possibility of executing any additional commands by overriding the following methods:
+
+* **onStart**: Will be called as part of the actor's life-cycle `preStart`, right after make the decision of starting a scheduler or not (see **[Scheduler](#markdown-header-scheduler)** for more details). Be carefull when using this method for a load balanced agent, since it will be called for every routee. So, if you are binding to some port inside this method, for example, be aware that the other routees may not be able to bind to the same port again. (See **[Load Balancing](#markdown-header-load-balancing)** for more details)
+* **onStop**: Will be called as part of the actor's life-cycle `postStop`, after stopping the scheduler. Normally, this method is used to "clean-up" the actor, like closing connections.
+* **onRestart**: Will be called as part of the actor's life-cycle `postRestart`. When the actor is restarted, all of its children will be stopped and the `postStop` is going to be called, followed by calling `preStart`
+
+### Messages
+
+Fey's generic actor final overrides the actor's `receive` method. But it gives you a complementary receive that is going to be called in case the message could not be handled by the generic receiver. The generic actor handles the following messages:
+
+1. **PRINT-PATH**: logs the actor's path
+2. **STOP**: Stops himself
+3. **PROCESS(message: T)**: Generic typed message that should be used for the communication between generic actors. This message check if the backoff is enable, and if not, it calls the user-overridable _processMessage_ method. (See **[processing messages](#markdown-header-processing-messages)** for more details). 
+4. **EXCEPTION(reason: Throwable)**: Throws the _Throwable_ so the actor's parent can handle it.
+
+All of the other messages that are not handled by the default _receive_ will be pass to the user-overridable **customReceive: Receive** method.
+
+`Keep in mind that even if your actor can handle a different set of messages of the default ones, the main communication between the generic actor should happen through the PROCESS message`
+
+###Propagating Messages
+
+Fey works with the concept that the actor will communicate with the actors that connects to it by sending _PROCESS_ messages. Having that in mind, the generic actor offers a final generic typed method (`propagateMessage(message:T)`) that sends a _PROCESS_ message to each one of the actors in the **[connectTo](#markdown-header-constructor)** parameter.
+
+`If you don't want to propagate the message to all of the actors that connects to it, you should implement a different propagate method.`
+
+###Processing Messages
+
+The `PROCESS[T](message: T)` is the global message to be used when communicating to other Fey actors. The actor can receive any type of message through it. 
+
+After receiving the `PROCESS` message, the actor will check if the [backoff](#markdown-header-handling-backoff) is enabled and, if it is enabled, nothing will happen and the message will not be processed, if it is not enabled then the actor will call the user-overridable `processMessage[T](message: T, sender: ActorRef)` method.
+
+The default implementation of `processMessage[T](message: T, sender: ActorRef)` logs the message being processed, calls the [propagate](#markdown-header-propagating-messages) method and then starts the backoff by calling `startBackoff` method (see [Handling Backoff](#markdown-header-handling-backoff)).
+
+You could override this method to handle only the type of message that you are expecting and to execute some action when a message is received. In the example bellow, the actors only handles PROCESS message of type `Int` or `String`, and starts the backoff if the message is of type `Int`
+
+```scala
+override def processMessage[T](message:T, sender: ActorRef): Unit = {
+ message match {
+   case msg:String =>
+     log.info(s"Processing String $msg")
+   case msg:Int =>
+     log.info(s"Processing Int $msg -> ${msg+1}")
+     startBackoff()
+   case x => log.info("Type not handled")
+ }
+}
+```
+
+####Handling Backoff
+
+A lot of use cases will require the actor to stop processing messages for a time interval after some specific action happend. The generic actor offers a built-in backoff that is used only by the `PROCESS` message.
+
+Every time you need the actor to backoff after an action, you should call the `startBackoff` method.
+The `startBackoff` method uses the constructor parameter **[backoff](#markdown-header-constructor)** and sets an internal state of the actor called `endBackoff` with the time in which the actor should starting processing messages again.
+The `endBackoff` internal state is verifyed everytime the actor gets a **[PROCESS](#markdown-header-processing-messages)** message.
+
+`Obs: Be careful when calling startBackoff. Make sure it will just be affected by the flow around the PROCESS message`
+
+
+###Scheduler
+
+The generic actor is able to start and control one scheduler. The scheduler will be started through the `preStart` life-cycle hook that will check if the constructor parameter **[schedulerTimeInterval](#markdown-header-constructor)** is different of zero and then start a `system.scheduler` that executes every **[schedulerTimeInterval](#markdown-header-constructor)**. If the parameter is zero no scheduler will be started.
+
+Once started, the scheduler will call the user-overridable `execute()` method every schedulerTimeInterval. When the actor dies or get restarted, the scheduler will be cancelled and then started again (in case of restart).
+
+`Obs: You can start as many schedulers as you want to inside your generic actor, but just the one started during the creation of the actor by Fey will be monitored, i.e., will be stopped and started as necessary.`
+
+### Load Balancing
+
+Fey implements [Akka](http://akka.io/) Load Balacing functionality using [Router Actors](http://doc.akka.io/docs/akka/snapshot/scala/routing.html) with **SmallestMailboxPool** strategy and with **DefaultResizer**. When starting the actor, Fey looks to the JSON configuration and check if the Performer should be a load balanced Performer. For more information about Routers, please visit Akka's webpage.
+ 
+### Example of a Fey Generic Actor
+
+```scala
+class MyGenericActor(override val params: Map[String, String] = Map.empty,
+               override val backoff: FiniteDuration = 1.minutes,
+               override val connectTo: Map[String, ActorRef] = Map.empty,
+               override val schedulerTimeInterval: FiniteDuration = 30.seconds,
+               override val orchestrationName: String = "",
+               override val orchestrationID: String = "",
+               override val autoScale: Boolean = false) extends FeyGenericActor {
+
+  override def onStart() = {
+    log.info(s"STARTING ${self.path.name}")
+    val ZMQcontext = ZMQ.context(1)
+    val publisher = ZMQcontext.socket(ZMQ.PUB)
+    publisher.bind(s"tcp://localhost:${params.get("port").get}")
+  }
+
+  var count = 0
+  
+  override def execute() = {
+    count +=1
+    propagateMessage(count)
+  }
+
+   override def processMessage[T](message:T, sender: ActorRef): Unit = {
+     message match {
+       case msg:String =>
+         log.info(s"Processing String $msg")
+       case msg:Int =>
+         log.info(s"Processing Int $msg -> ${msg+1}")
+         startBackoff()
+       case x => log.info("Type not handled")
+     }
+   }
+
+
+  override def customReceive(): Receive ={
+    case x => log.warning(s"Untreated message $x")
+  }
+
+}
+```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/images/Fey-Actor-Tree.png
----------------------------------------------------------------------
diff --git a/fey-core/images/Fey-Actor-Tree.png b/fey-core/images/Fey-Actor-Tree.png
new file mode 100644
index 0000000..b12e664
Binary files /dev/null and b/fey-core/images/Fey-Actor-Tree.png differ

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
new file mode 100644
index 0000000..c96ef5d
--- /dev/null
+++ b/fey-core/src/main/resources/application.conf
@@ -0,0 +1,52 @@
+//
+//Licensed to the Apache Software Foundation (ASF) under one or more
+//contributor license agreements.  See the NOTICE file distributed with
+//this work for additional information regarding copyright ownership.
+//The ASF licenses this file to You under the Apache License, Version 2.0
+//(the "License"); you may not use this file except in compliance with
+//the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing, software
+//distributed under the License is distributed on an "AS IS" BASIS,
+//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//See the License for the specific language governing permissions and
+//limitations under the License.
+//
+
+fey-global-configuration{
+
+  // If check point is not enabled Fey won't check out the last state
+  // and will not be able to recovery properly in case of a failure (The json files
+  // will also not be update to add the extension .processed
+  // Disabling the checkpoint is recommended during a development phase
+  // or when the Orchestration will not change frequently
+  enable-checkpoint = true
+
+  // Directory where fey is going to store its checkpoint
+  checkpoint-directory = "/tmp/fey/checkpoint"
+
+  // Directory where the JSON will be sitting
+  json-repository = ${HOME}"/feyJSONRepo"
+
+  // Defines each file extension to look for inside the JSON Repo
+  json-extension = ".json"
+
+  // Defines the repo for the jars
+  jar-repository = ${HOME}"/feyJarRepo"
+
+  // Fey Log Level
+  log-level = "DEBUG"
+
+  // Defines where the log will be appended
+  // Options: FILE or STDOUT or FILE_STDOUT
+  log_appender = "STDOUT"
+
+}
+
+akka {
+  loggers = ["akka.event.slf4j.Slf4jLogger"]
+  loglevel = "DEBUG"
+  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/resources/d3Tree.html
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/d3Tree.html b/fey-core/src/main/resources/d3Tree.html
new file mode 100644
index 0000000..8455e1c
--- /dev/null
+++ b/fey-core/src/main/resources/d3Tree.html
@@ -0,0 +1,198 @@
+<!--
+Copyright 2010-2016 Mike Bostock
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+  this list of conditions and the following disclaimer in the documentation
+  and/or other materials provided with the distribution.
+
+* Neither the name of the author nor the names of contributors may be used to
+  endorse or promote products derived from this software without specific prior
+  written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+http://bl.ocks.org/mbostock/4339083
+-->
+<!DOCTYPE html>
+<meta charset="utf-8">
+<style>
+
+.node {
+  cursor: pointer;
+}
+
+.node circle {
+  fill: #fff;
+  stroke: steelblue;
+  stroke-width: 1.5px;
+}
+
+.node text {
+  font: 10px sans-serif;
+}
+
+.link {
+  fill: none;
+  stroke: #ccc;
+  stroke-width: 1.5px;
+}
+
+</style>
+<body>
+<script src="http://d3js.org/d3.v3.min.js"></script>
+<script>
+
+var margin = {top: 20, right: 100, bottom: 20, left: 200},
+    width = 1500 - margin.right - margin.left,
+    height = 800 - margin.top - margin.bottom;
+
+var i = 0,
+    duration = 750,
+    root;
+
+var tree = d3.layout.tree()
+    .size([height, width]);
+
+var diagonal = d3.svg.diagonal()
+    .projection(function(d) { return [d.y, d.x]; });
+
+var svg = d3.select("body").append("svg")
+    .attr("width", width + margin.right + margin.left)
+    .attr("height", height + margin.top + margin.bottom)
+  .append("g")
+    .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+
+var treeData = [ $MYJSONHIERARCHY ];
+
+root = treeData[0];
+root.x0 = height / 2;
+root.y0 = 0;
+
+function collapse(d) {
+  if (d.children) {
+    d._children = d.children;
+    d._children.forEach(collapse);
+    d.children = null;
+  }
+}
+
+update(root);
+
+d3.select(self.frameElement).style("height", "800px");
+
+function update(source) {
+
+  // Compute the new tree layout.
+  var nodes = tree.nodes(root).reverse(),
+      links = tree.links(nodes);
+
+  // Normalize for fixed-depth.
+  nodes.forEach(function(d) { d.y = d.depth * 180; });
+
+  // Update the nodes\u2026
+  var node = svg.selectAll("g.node")
+      .data(nodes, function(d) { return d.id || (d.id = ++i); });
+
+  // Enter any new nodes at the parent's previous position.
+  var nodeEnter = node.enter().append("g")
+      .attr("class", "node")
+      .attr("transform", function(d) { return "translate(" + source.y0 + "," + source.x0 + ")"; })
+      .on("click", click);
+
+  nodeEnter.append("circle")
+      .attr("r", 1e-6)
+      .style("fill", function(d) { return d._children ? "lightsteelblue" : "#fff"; });
+
+  nodeEnter.append("text")
+      .attr("x", function(d) { return d.children || d._children ? -10 : 10; })
+      .attr("dy", ".35em")
+      .attr("text-anchor", function(d) { return d.children || d._children ? "end" : "start"; })
+      .text(function(d) { return d.name; })
+      .style("fill-opacity", 1e-6);
+
+  // Transition nodes to their new position.
+  var nodeUpdate = node.transition()
+      .duration(duration)
+      .attr("transform", function(d) { return "translate(" + d.y + "," + d.x + ")"; });
+
+  nodeUpdate.select("circle")
+      .attr("r", 4.5)
+      .style("fill", function(d) { return d._children ? "lightsteelblue" : "#fff"; });
+
+  nodeUpdate.select("text")
+      .style("fill-opacity", 1);
+
+  // Transition exiting nodes to the parent's new position.
+  var nodeExit = node.exit().transition()
+      .duration(duration)
+      .attr("transform", function(d) { return "translate(" + source.y + "," + source.x + ")"; })
+      .remove();
+
+  nodeExit.select("circle")
+      .attr("r", 1e-6);
+
+  nodeExit.select("text")
+      .style("fill-opacity", 1e-6);
+
+  // Update the links
+  var link = svg.selectAll("path.link")
+      .data(links, function(d) { return d.target.id; });
+
+  // Enter any new links at the parent's previous position.
+  link.enter().insert("path", "g")
+      .attr("class", "link")
+      .attr("d", function(d) {
+        var o = {x: source.x0, y: source.y0};
+        return diagonal({source: o, target: o});
+      });
+
+  // Transition links to their new position.
+  link.transition()
+      .duration(duration)
+      .attr("d", diagonal);
+
+  // Transition exiting nodes to the parent's new position.
+  link.exit().transition()
+      .duration(duration)
+      .attr("d", function(d) {
+        var o = {x: source.x, y: source.y};
+        return diagonal({source: o, target: o});
+      })
+      .remove();
+
+  // Stash the old positions for transition.
+  nodes.forEach(function(d) {
+    d.x0 = d.x;
+    d.y0 = d.y;
+  });
+}
+
+// Toggle children on click.
+function click(d) {
+  if (d.children) {
+    d._children = d.children;
+    d.children = null;
+  } else {
+    d.children = d._children;
+    d._children = null;
+  }
+  update(d);
+}
+
+</script>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/resources/fey-json-schema-validator.json
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
new file mode 100644
index 0000000..83dca5e
--- /dev/null
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -0,0 +1,102 @@
+{
+  "guid":{
+    "type":"string"
+  },
+  "command":{
+    "type":"string",
+    "pattern":"(?i)(RECREATE|CREATE|UPDATE|DELETE)"
+  },
+  "timestamp":{
+    "type":"string",
+    "pattern":"\\d+"
+  },
+  "name":{
+    "type":"string"
+  },
+  "ensembles":{
+    "type":"array",
+    "items":{
+      "guid":{
+        "type":"string"
+      },
+      "command":{
+        "type":"string",
+        "pattern":"(?i)(NONE|CREATE|UPDATE|DELETE)"
+      },
+      "performers":{
+        "type":"array",
+        "items":{
+          "guid":{
+            "type":"string"
+          },
+          "autoScale":{
+            "type":"integer",
+            "minimum":0
+          },
+          "schedule":{
+            "type":"integer",
+            "minimum":0
+          },
+          "backoff":{
+            "type":"integer",
+            "minimum":0
+          },
+          "source":{
+            "name":{
+              "type":"string",
+              "pattern":"\\w+(\\.jar)"
+            },
+            "classPath":{
+              "type":"string",
+              "pattern":"\\w+"
+            },
+            "parameters":{
+              "patternProperties":{
+                ".*":{
+                  "type":"string"
+                }
+              }
+            },
+            "required":[
+              "name",
+              "classPath",
+              "parameters"
+            ]
+          },
+          "required":[
+            "guid",
+            "schedule",
+            "backoff",
+            "source"
+          ]
+        }
+      },
+      "connections":{
+        "type":"array",
+        "items":{
+          "patternProperties":{
+            ".*":{
+              "type":"array",
+              "items":{
+                "type":"string"
+              }
+            }
+          }
+        }
+      },
+      "required":[
+        "guid",
+        "command",
+        "performers",
+        "connections"
+      ]
+    }
+  },
+  "required":[
+    "guid",
+    "command",
+    "timestamp",
+    "name",
+    "ensembles"
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/fey-core/src/main/resources/logback.xml b/fey-core/src/main/resources/logback.xml
new file mode 100644
index 0000000..bace975
--- /dev/null
+++ b/fey-core/src/main/resources/logback.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more -->
+<!--contributor license agreements.  See the NOTICE file distributed with -->
+<!--this work for additional information regarding copyright ownership. -->
+<!--The ASF licenses this file to You under the Apache License, Version 2.0 -->
+<!--(the "License"); you may not use this file except in compliance with -->
+<!--the License.  You may obtain a copy of the License at -->
+<!-- -->
+<!--   http://www.apache.org/licenses/LICENSE-2.0 -->
+<!-- -->
+<!--Unless required by applicable law or agreed to in writing, software -->
+<!--distributed under the License is distributed on an "AS IS" BASIS, -->
+<!--WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
+<!--See the License for the specific language governing permissions and -->
+<!--limitations under the License. -->
+
+<configuration>
+
+    <appender name="FEY-CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <target>System.out</target>
+        <encoder>
+            <charset>UTF-8</charset>
+            <pattern>[%p] [%d{yy/MM/dd HH:mm:ss}] %c [%X{akkaSource}]: %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <!-- Fey core -->
+    <appender name="FEY-FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <append>true</append>
+        <file>${HOME}/.fey/logs/fey_core.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <!-- daily rollover. Make sure the path matches the one in the file element or else
+             the rollover logs are placed in the working directory. -->
+            <fileNamePattern>${HOME}/.fey/logs/fey_core-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <maxFileSize>1MB</maxFileSize>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>1GB</totalSizeCap>
+        </rollingPolicy>
+
+       <!-- <filter class="ch.qos.logback.core.filter.EvaluatorFilter">
+            <evaluator>
+                <expression>
+                    if(logger.startsWith("com.apache.iota.fey.FeyCore"))
+                        return true;
+                    return false;
+                </expression>
+            </evaluator>
+            <OnMismatch>DENY</OnMismatch>
+            <OnMatch>NEUTRAL</OnMatch>
+        </filter> -->
+
+        <encoder>
+            <charset>UTF-8</charset>
+            <pattern>[%p] [%d{yy/MM/dd HH:mm:ss}] %c [%X{akkaSource}]: %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <!-- Orchestrations core -->
+    <!-- <appender name="FILE-ORHC" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <append>true</append>
+        <file>logs/fey_orchestrations.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <fileNamePattern>logs/fey_orchestrations-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <maxFileSize>100KB</maxFileSize>
+            <maxHistory>60</maxHistory>
+            <totalSizeCap>1GB</totalSizeCap>
+        </rollingPolicy>
+
+        <filter class="ch.qos.logback.core.filter.EvaluatorFilter">
+            <evaluator>
+                <expression>
+                    if(logger.startsWith("com.apache.iota.fey.Orchestration"))
+                    return true;
+                    return false;
+                </expression>
+            </evaluator>
+            <OnMismatch>DENY</OnMismatch>
+            <OnMatch>NEUTRAL</OnMatch>
+        </filter>
+
+        <encoder>
+            <charset>UTF-8</charset>
+            <pattern>[%p] [%d{yy/MM/dd HH:mm:ss}] [%X{akkaSource}]: %msg%n</pattern>
+        </encoder>
+    </appender> -->
+
+    <!-- Logging asynchronously -->
+    <!-- <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+        <queueSize>1000</queueSize>
+        <appender-ref ref="FILE" />
+    </appender> -->
+
+    <!--
+    <logger name="com.apache.iota.fey" level="INFO" appender="FILE-FEY-CORE"/>
+    <logger name="com.apache.iota.fey.Orchestration" level="INFO" appender="FILE-ORHC"/> -->
+
+    <root level="INFO">
+        <appender-ref ref="FEY-FILE"/>
+        <appender-ref ref="FEY-CONSOLE"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
new file mode 100644
index 0000000..e9d5073
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{ActorSystem, Props}
+import akka.io.IO
+import akka.pattern.ask
+import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
+import spray.can.Http
+
+import scala.concurrent.duration._
+
+object Application extends App {
+
+  CONFIG.loadUserConfiguration(if(args.length == 1) args(0) else "")
+
+  implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM")
+
+  val fey = system.actorOf(FeyCore.props, name = "FEY-CORE")
+
+  val service = system.actorOf(Props[MyServiceActor], name = "FEY_REST_API")
+
+  implicit val timeout = Timeout(800.seconds)
+  IO(Http) ? Http.Bind(service, interface = "0.0.0.0", port = 16666)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
new file mode 100644
index 0000000..6a8c3c1
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.io.File
+import java.nio.file.StandardWatchEventKinds._
+import java.nio.file.{FileSystems, Path}
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+
+/**
+  *
+  * @param watcherActor
+  */
+class WatchDirectoryTask(watcherActor: ActorRef) extends Runnable {
+
+  private val watchService = FileSystems.getDefault.newWatchService()
+
+  def watch(path: Path) = path.register(watchService, ENTRY_CREATE, ENTRY_MODIFY)
+
+  override def run() {
+    try {
+      while (!Thread.currentThread().isInterrupted) {
+        val key = watchService.take()
+        val eventsIterator = key.pollEvents().iterator()
+        while(eventsIterator.hasNext) {
+          val event = eventsIterator.next()
+          val relativePath = event.context().asInstanceOf[Path]
+          val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
+          event.kind() match {
+            case ENTRY_CREATE =>
+              watcherActor ! DirectoryWatcherActor.FILE_EVENT(path.toFile, "CREATED")
+            case ENTRY_MODIFY =>
+              watcherActor ! DirectoryWatcherActor.FILE_EVENT(path.toFile, "UPDATED")
+            case x => println("UNDEFINED MESSAGE")
+          }
+        }
+        key.reset()
+      }
+    } catch {
+      case e: InterruptedException =>
+        throw e
+    } finally {
+      watchService.close()
+    }
+  }
+}
+
+class DirectoryWatcherActor(val fileExtension: String) extends Actor with ActorLogging {
+
+  import DirectoryWatcherActor._
+
+  val watchFileTask = new WatchDirectoryTask(self)
+  var watchThread = new Thread(watchFileTask, "WatchService")
+
+  override def preStart() {
+    watchThread.setDaemon(true)
+    watchThread.start()
+  }
+
+  override def postStop() {
+    watchThread.interrupt()
+  }
+
+  override def receive: Receive = {
+    case MONITOR(path) =>
+      log.info(s"Start monitoring ${path.getFileName}")
+      watchFileTask.watch(path)
+    case FILE_EVENT(file, eventType) if file.getAbsolutePath.endsWith(fileExtension) =>
+      log.info(s"$eventType = ${file.getAbsolutePath}")
+      context.parent ! FeyCore.NEW_FILE_ACTION(file)
+  }
+}
+
+object DirectoryWatcherActor {
+
+  /**
+    * Start monitoring directory
+    *
+    * @param path directory path
+    */
+  case class MONITOR(path: Path)
+  case class FILE_EVENT(file: File, event: String)
+
+  def props(fileExtension: String): Props = {
+    Props(new DirectoryWatcherActor(fileExtension))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
new file mode 100644
index 0000000..87fbe7f
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.SupervisorStrategy.Restart
+import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
+import akka.routing.{DefaultResizer, SmallestMailboxPool}
+import org.apache.iota.fey.JSON_PATH._
+import play.api.libs.json.JsObject
+
+import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
+
+protected class Ensemble(val orchestrationID: String,
+                         val orchestrationName: String,
+                         val ensembleSpec: JsObject) extends Actor with ActorLogging{
+
+  import Ensemble._
+
+  var performers_metadata: Map[String, Performer] = Map.empty[String, Performer]
+  var connectors: Map[String,Array[String]] = Map.empty[String,Array[String]]
+  var performer: Map[String,ActorRef] = Map.empty[String,ActorRef]
+
+  override def receive: Receive = {
+
+    case STOP_PERFORMERS => stopPerformers()
+
+    case PRINT_ENSEMBLE =>
+      val ed = connectors.map(connector => {
+        s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
+      }).mkString("\n")
+      val nd = performers_metadata.map(performer_metadata => s"${performer_metadata._1}").mkString("[",",","]")
+      val actors = performer.map(actor => actor._2.path.name).mkString("["," | ","]")
+      log.info(s"Edges: \n$ed \nNodes: \n\t$nd \nPerformers \n\t$actors")
+      context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
+
+    case Terminated(actor) =>
+      log.error(s"DEAD nPerformers ${actor.path.name}")
+      context.children.foreach{ child =>
+        context.unwatch(child)
+        context.stop(child)
+      }
+      throw new RestartEnsemble(s"DEAD Performer ${actor.path.name}")
+
+    case x => log.warning(s"Message $x not treated by Ensemble")
+  }
+
+  /**
+    * If any of the performer dies, it tries to restart it.
+    * If we could not be restarted, then the terminated message will be received
+    * and Ensemble is going to throw an Exception to its orchestration
+    * asking it to Restart the entire Ensemble. The restart will then stop all of its
+    * children when call the preStart.
+    */
+  override val supervisorStrategy =
+    OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+      case e: Exception => Restart
+    }
+
+  /**
+    * Uses the json spec to create the performers
+    */
+  override def preStart() = {
+
+    val connectors_js = (ensembleSpec \ CONNECTIONS).as[List[JsObject]]
+    val performers_js = (ensembleSpec \ PERFORMERS).as[List[JsObject]]
+
+    performers_metadata = extractPerformers(performers_js)
+    connectors = extractConnections(connectors_js)
+
+    performer = createPerformers()
+
+    val ed = connectors.map(connector => {
+      s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
+    }).mkString("\n")
+    val nd = performers_metadata.map(performer => s"${performer._1}").mkString("[",",","]")
+    val actors = performer.map(actor => actor._2.path.name).mkString("["," | ","]")
+    log.info(s"Edges: \n$ed \nNodes: \n\t$nd \nPerformers \n\t$actors")
+
+  }
+
+  override def postRestart(reason: Throwable): Unit = {
+    preStart()
+  }
+
+
+  def getActorsName(): Set[String] = {
+    performer.map(performer => performer._2.path.name).toSet
+  }
+
+  private def createPerformers(): Map[String,ActorRef] = {
+    val tmpActors: HashMap[String, ActorRef] = HashMap.empty
+    connectors.foreach {
+      case (performerID: String, connectionIDs: Array[String]) =>
+        try {
+          createFeyActor(performerID, connectionIDs, tmpActors)
+        } catch {
+          /* if the creation fails, it will stop all the actors in the Ensemble */
+          case e: Exception =>
+            log.error(e,"During Performer creation")
+            throw new RestartEnsemble("Not able to create the Performers.")
+        }
+    }
+
+    //Treat performers that are not part of the connectors
+    noConnectionsPerformers(tmpActors)
+
+    tmpActors.toMap
+  }
+
+  /**
+    * Creates actors that was not part of an Connection
+    *
+    * @param tmpActors
+    */
+  private def noConnectionsPerformers(tmpActors: HashMap[String, ActorRef]) = {
+    performers_metadata.filter(performer_metadata => !tmpActors.contains(performer_metadata._1)).
+      foreach(performer_metadata => {
+        try {
+          createFeyActor(performer_metadata._1, Array.empty, tmpActors)
+        } catch {
+          /* if the creation fails, it will stop all the actors in the Ensemble */
+          case e: Exception =>
+            log.error(e, "Problems while creating Performer")
+            throw new RestartEnsemble("Not able to create the Performer.")
+        }
+      })
+  }
+
+  /**
+    * Create an actorOf for the performer in the Ensemble
+    *
+    * @param performerID performer uid
+    * @param connectionIDs performer connections (connectors)
+    * @param tmpActors auxiliar map of actorsRef
+    * @return (performerID, ActorRef of the performer)
+    */
+  private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String, ActorRef]):(String, ActorRef) = {
+    if(!tmpActors.contains(performerID)){
+      val performerInfo = performers_metadata.getOrElse(performerID, null)
+      if(performerInfo != null){
+        val connections: Map[String, ActorRef] = connectionIDs.map(connID => {
+          createFeyActor(connID, connectors.getOrElse(connID,Array.empty),tmpActors)
+        }).toMap
+
+        val clazz = loadClazzFromJar(performerInfo.classPath, performerInfo.jarName)
+
+        var actor:ActorRef = null
+        if(performerInfo.autoScale > 0) {
+          val resizer = DefaultResizer(lowerBound = 1, upperBound = performerInfo.autoScale, messagesPerResize = 200, backoffThreshold = 0.4)
+          val smallestMailBox = SmallestMailboxPool(1, Some(resizer))
+          actor = context.actorOf(
+            smallestMailBox.props(Props(clazz,
+              performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, true)),
+            name = performerID)
+        }else{
+          actor = context.actorOf(Props(clazz, performerInfo.parameters,
+            performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, false),
+            name = performerID)
+        }
+
+        context.watch(actor)
+        tmpActors.put(performerID, actor)
+        (performerID, actor)
+      }else{
+        throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON")
+      }
+    }else{
+      (performerID, tmpActors.get(performerID).get)
+    }
+  }
+
+  /**
+    * Load a clazz instance of FeyGenericActor from a jar
+    *
+    * @param classPath class path
+    * @param jarName jar name
+    * @return clazz instance of FeyGenericActor
+    */
+  private def loadClazzFromJar(classPath: String, jarName: String):Class[FeyGenericActor] = {
+    try {
+      Utils.loadActorClassFromJar(s"${CONFIG.JAR_REPOSITORY}/$jarName",classPath)
+    }catch {
+      case e: Exception =>
+        log.error(e,s"Could not load class $classPath from jar $jarName. Please, check the Jar repository path as well the jar name")
+        throw e
+    }
+  }
+
+  override val toString = {
+    val ed = connectors.map(connector => {
+      s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
+    }).mkString("\n")
+    val nd = performers_metadata.map(performer => s"${performer._1}").mkString("[",",","]")
+    val actors = performer.map(actor => actor._2.path.name).mkString("["," | ","]")
+    s"Edges: \n$ed \nNodes: \n\t$nd \nPerformer \n\t$actors"
+  }
+
+  def stopPerformers() = {
+    performer.foreach(actor => actor._2 ! PoisonPill)
+  }
+
+}
+
+object Ensemble {
+
+  /**
+    * Extract a map from the connectors json
+    *
+    * @param connectors connectors json
+    * @return map of connectors
+    */
+  def extractConnections(connectors: List[JsObject]): Map[String,Array[String]] = {
+    connectors.map(connector => {
+      connector.keys.map(key => {
+        (key, (connector \ key).as[List[String]].toArray)
+      }).toMap
+    }).flatten.toMap
+  }
+
+  /**
+    * Extratc a map from the performers json
+    *
+    * @param performers performers json
+    * @return Map of performers
+    */
+  def extractPerformers(performers: List[JsObject]): Map[String, Performer] = {
+    performers.map(performer => {
+      val id: String= (performer \ GUID).as[String]
+      val schedule: Int = (performer \ SCHEDULE).as[Int]
+      val backoff: Int = (performer \ BACKOFF).as[Int]
+      val autoScale: Int = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) (performer \ PERFORMER_AUTO_SCALE).as[Int] else 0
+      val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
+      val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
+      val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
+      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale))
+    }).toMap
+  }
+
+  /**
+    * returns a map of the params
+    *
+    * @param params params json
+    * @return map of params where key is the json key and value is the json value for the key
+    */
+  private def getMapOfParams(params: JsObject):Map[String,String] = {
+    params.keys.map(key => {
+      (key.toString, (params \ key).as[String])
+    }).toMap
+  }
+
+  /**
+    * Message that send the START message to all of the Performers
+    */
+  case object STOP_PERFORMERS
+  case object PRINT_ENSEMBLE
+}
+
+/**
+  * Holds the performer information
+  *
+  * @param uid performer uid
+  * @param jarName performer jar name
+  * @param classPath performer class path
+  * @param parameters performer params
+  * @param schedule performer schedule interval
+  * @param backoff performer backoff interval
+  */
+case class Performer(uid: String, jarName: String,
+                classPath: String, parameters: Map[String,String],
+                schedule: FiniteDuration, backoff: FiniteDuration,
+                autoScale: Int)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
new file mode 100644
index 0000000..c01ba3c
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.nio.file.Paths
+import java.io.File
+
+import scala.concurrent.duration._
+import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
+import Utils._
+import akka.actor.SupervisorStrategy._
+import play.api.libs.json._
+import JSON_PATH._
+import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
+import com.eclipsesource.schema._
+
+import scala.collection.mutable.HashMap
+
+protected class FeyCore extends Actor with ActorLogging{
+
+  import FeyCore._
+  import CONFIG._
+
+  var watcherActor: ActorRef = null
+  var identifier: ActorRef = null
+
+  override def receive: Receive = {
+
+    case JSON_TREE =>
+      printActiveActors()
+
+    case START =>
+      createIdentifierActor()
+      processInitialFiles(JSON_REPOSITORY)
+      self ! WATCH_DIR(JSON_REPOSITORY)
+
+    case NEW_FILE_ACTION(file) =>
+      log.info(s"NEW FILE ${file.getAbsolutePath}")
+      try{
+        processJson(file)
+        renameProcessedFile(file, "processed")
+      }catch {
+        case e: Exception =>
+          renameProcessedFile(file, "failed")
+          log.error(e, s"JSON not processed ${file.getAbsolutePath}")
+      }
+
+    case WATCH_DIR(path) =>
+      if(watcherActor == null) {
+        watcherActor = context.actorOf(DirectoryWatcherActor.props(JSON_EXTENSION), name = WATCHER_NAME)
+        context.watch(watcherActor)
+
+      }
+      watcherActor ! DirectoryWatcherActor.MONITOR(Paths.get(path))
+
+    case STOP_EMPTY_ORCHESTRATION(orchID) =>
+      log.warning(s"Deleting Empty Orchestration $orchID")
+      deleteOrchestration(orchID)
+
+    case Terminated(actor) =>
+      actor.path.name match {
+        case IDENTIFIER_NAME =>
+          createIdentifierActor()
+        case WATCHER_NAME =>
+          watcherActor = null
+          self ! WATCH_DIR(JSON_REPOSITORY)
+        case guid: String =>
+          log.info(s"TERMINATED ${guid}")
+          FEY_CACHE.activeOrchestrations.remove(guid)
+          if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
+            checkForOrchestrationWaitingForTermination(guid)
+          }
+      }
+
+    case x =>
+      log.info(s"Received $x")
+
+  }
+
+  /**
+    * Clean up Fey Cache
+    */
+  override def postStop() = {
+    FEY_CACHE.activeOrchestrations.clear()
+    FEY_CACHE.orchestrationsAwaitingTermination.clear()
+    ORCHESTRATION_CACHE.orchestration_metadata.clear()
+  }
+
+  override def preStart() = {
+    log.info("Starting Fey Core")
+    if (CHEKPOINT_ENABLED) {
+      processInitialFiles(CHECKPOINT_DIR, true)
+    }
+    self ! START
+  }
+
+  override val supervisorStrategy =
+    OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+      case _: Exception =>
+        if(sender() == watcherActor) Stop
+        else Restart
+    }
+
+  /**
+    * Actor that prints all the actors in the fey core tree
+    */
+  private def createIdentifierActor() = {
+      identifier = context.actorOf(Props(classOf[IdentifyFeyActors]), name = IDENTIFIER_NAME)
+  }
+
+  /**
+    * Process all the files that are already in the dir
+    * before starting watching for new files
+    */
+  private def processInitialFiles(directory: String, delete: Boolean = false) = {
+    getFilesInDirectory(directory)
+      .filter(file => file.getName.endsWith(JSON_EXTENSION))
+      .foreach(file => {
+        try {
+          processJson(file)
+          if(delete){
+            file.delete()
+          }else {
+            renameProcessedFile(file, "processed")
+          }
+        } catch {
+          case e: Exception =>
+            renameProcessedFile(file, "failed")
+            log.error(e, s"JSON not processed ${file.getAbsolutePath}")
+        }
+      })
+  }
+
+  /**
+    * Process the JSON in the file.
+    * Process the JSON is a binary operation.
+    * The network only will be established if the entire JSON can be processed.
+    * Throws IllegalArgumentException if json cannot be parsed.
+    * JSON commands:
+    *   CREATE: tells Fey that there is no previous orchestration active for this JSON.
+    *           Fey will create the orchestration and all the Ensembles in the JSON.
+    *           Throws exception in case there is a orchestration active for the JSON.
+    *   UPDATE: tells Fey that there is a orchestration loaded for the JSON.
+    *           Fey will check the command for each of the Ensembles and execute the correspondent action.
+    *           See @updateOrchestration
+    *   DELETE: Tells Fey to delete the active orchestration for the JSON.
+    *   RECREATE: Tells Fey that might exists an active orchestration, if that is the case, delete the orchestration and recreate it
+    *             otherwise, simply create it.
+    *
+    * @param file
+    */
+  private def processJson(file: File): Unit ={
+    log.info(s"File: ${file.getAbsolutePath}")
+    loadJsonFromFile(file) match {
+      case Some(json) =>
+        if(validJSONSchema(json)) {
+          val orchestrationName = (json \ ORCHESTRATION_NAME).as[String]
+          val orchestrationID = (json \ GUID).as[String]
+          val orchestrationCommand = (json \ COMMAND).as[String].toUpperCase()
+          val orchestrationTimestamp = (json \ ORCHESTRATION_TIMESTAMP).as[String]
+          val ensembles = (json \ ENSEMBLES).as[List[JsObject]]
+          orchestrationCommand match {
+            case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+            case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+            case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+            case "DELETE" => deleteOrchestration(orchestrationID)
+            case x => throw new CommandNotRecognized(s"Command: $x")
+          }
+        }
+      case None =>
+        throw new IllegalArgumentException(s"Could not parser the JSON in the file ${file.getAbsolutePath}")
+    }
+  }
+
+  def validJSONSchema(json: JsValue):Boolean = {
+    try {
+      val result = SchemaValidator.validate(jsonSchemaSpec, json)
+      if (result.isError) {
+        log.error("Incorrect JSON schema")
+        log.error(result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
+          val path = (error \ "instancePath").as[String]
+          val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
+          s"$path \n\tErrors: $msg"
+        }).mkString("\n"))
+        false
+      } else {
+        true
+      }
+    }catch{
+      case e: Exception =>
+        log.error(e,"Error while validating JSON")
+        false
+    }
+  }
+
+  /**
+    * If no previous orchestration: Creates a new orchestration
+    * If previous orchestration: check if timestamp is greater than the last processed timestamp
+    *                      If it is greater, than cache the orchestration information to be used after
+    *                      current orchestration termination, and deletes current orchestration
+    *
+    * @param ensemblesSpecJson
+    * @param orchestrationID
+    * @param orchestrationName
+    * @param orchestrationTimestamp
+    * @return
+    */
+  private def recreateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
+                              orchestrationName: String, orchestrationTimestamp: String) = {
+    FEY_CACHE.activeOrchestrations.get(orchestrationID) match {
+      case Some(orchestration) =>
+        try{
+          // If timestamp is greater than the last timestamp
+          if(orchestration._1 != orchestrationTimestamp){
+            val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp)
+            FEY_CACHE.orchestrationsAwaitingTermination.put(orchestrationID, orchestrationInfo)
+            deleteOrchestration(orchestrationID)
+          }else{
+            log.warning(s"Orchestration ${orchestrationID} not recreated. Timestamp did not change.")
+          }
+        }catch{
+          case e: Exception =>
+        }
+      case None => createOrchestration(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp)
+    }
+  }
+
+  /**
+    * Checks if there is a orchestration waiting for this ID termination
+    *
+    * @param terminatedOrchestrationName
+    */
+  private def checkForOrchestrationWaitingForTermination(terminatedOrchestrationName: String) = {
+    FEY_CACHE.orchestrationsAwaitingTermination.get(terminatedOrchestrationName) match {
+      case Some(orchestrationAwaiting) =>
+        FEY_CACHE.orchestrationsAwaitingTermination.remove(terminatedOrchestrationName)
+        createOrchestration(orchestrationAwaiting.ensembleSpecJson, orchestrationAwaiting.orchestrationID,
+          orchestrationAwaiting.orchestrationName, orchestrationAwaiting.orchestrationTimestamp)
+      case None =>
+    }
+  }
+
+  /**
+    * Creates a Orchestration according to the JSON spec.
+    * If any exception happens during the creation, the orchestration actor will be killed
+    * and as consequence all of its children.
+    *
+    * @param ensemblesSpecJson
+    * @param orchestrationID
+    * @param orchestrationName
+    * @param orchestrationTimestamp
+    */
+  private def createOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
+                            orchestrationName: String, orchestrationTimestamp: String) = {
+    try{
+      if(!FEY_CACHE.activeOrchestrations.contains(orchestrationID)) {
+        val orchestration = context.actorOf(
+          Props(classOf[Orchestration], orchestrationName, orchestrationID, orchestrationTimestamp),
+          name = orchestrationID)
+        FEY_CACHE.activeOrchestrations.put(orchestrationID, (orchestrationTimestamp, orchestration))
+        context.watch(orchestration)
+        orchestration ! CREATE_ENSEMBLES(ensemblesSpecJson)
+      }else{
+        log.error(s"Orchestration $orchestrationID is already defined in the network.")
+      }
+    }catch{
+      case e: Exception =>
+        FEY_CACHE.activeOrchestrations.get(orchestrationID) match{
+          case Some(orchestration) =>
+            context.unwatch(orchestration._2)
+            orchestration._2 ! PoisonPill
+            FEY_CACHE.activeOrchestrations.remove(orchestrationID)
+          case None => context.actorSelection(orchestrationID) ! PoisonPill
+        }
+        log.error(e, s"Could not create Orchestration $orchestrationID")
+    }
+  }
+
+  /**
+    * Stops the orchestration and remove it from the list of active orchestrations
+    *
+    * @param orchestrationID
+    * @return
+    */
+  private def deleteOrchestration(orchestrationID: String) = {
+    try{
+      FEY_CACHE.activeOrchestrations.get(orchestrationID) match {
+        case Some(orchestration) =>
+          orchestration._2 ! PoisonPill
+          FEY_CACHE.activeOrchestrations.remove(orchestrationID)
+          updateOrchestrationState(orchestrationID,true)
+        case None =>
+          log.warning(s"No active Orchestration $orchestrationID to be deleted")
+      }
+    }catch{
+      case e: Exception => log.error(e, s"Could not delete Orchestration $orchestrationID")
+    }
+  }
+
+  private def updateOrchestration(ensemblesSpecJson: List[JsObject], orchestrationID: String,
+                            orchestrationName: String, orchestrationTimestamp: String) = {
+    FEY_CACHE.activeOrchestrations.get(orchestrationID) match {
+      case None => log.warning(s"Orchestration not update. No active Orchestration $orchestrationID.")
+      case Some(orchestration) => {
+        ensemblesSpecJson.foreach(ensemble => {
+          (ensemble \ COMMAND).as[String].toUpperCase() match {
+            case "CREATE" => orchestration._2 ! CREATE_ENSEMBLES(List(ensemble))
+            case "DELETE" => orchestration._2 ! DELETE_ENSEMBLES(List(ensemble))
+            case "UPDATE" => orchestration._2 ! UPDATE_ENSEMBLES(List(ensemble))
+            case "NONE" =>
+            case x => log.warning(s"Command $x not recognized")
+          }
+        })
+      }
+    }
+  }
+
+  def printStatus() = {
+    printActiveOrchestrations
+    printWaitingTermination
+    printActiveActors
+  }
+
+  def printWaitingTermination() = {
+    val ids = FEY_CACHE.orchestrationsAwaitingTermination.map(orchestration => {
+      orchestration._1
+    }).mkString("[",",","]")
+    log.info(s"\n === Waiting: $ids")
+  }
+
+  def printActiveOrchestrations() = {
+    val ids = FEY_CACHE.activeOrchestrations.map(orchestration => {
+      orchestration._1
+    }).mkString("[",",","]")
+    log.info(s"\n === Active: $ids")
+  }
+
+  def printActiveActors() = {
+    identifier ! IdentifyFeyActors.IDENTIFY_TREE(self.path.toString)
+  }
+
+}
+
+protected object FeyCore{
+
+  case object JSON_TREE
+
+  /**
+    * Send this message to Start Directory Watcher Thread
+    *
+    * @param path
+    */
+  sealed case class WATCH_DIR(path: String)
+
+  /**
+    * After creating an actorOf FeyCore send this message to configure.
+    */
+  case object START
+
+  /**
+    * Used by the DirectoryWatcher to notify fey when a new file was added
+    *
+    * @param file java.io.File
+    */
+  case class NEW_FILE_ACTION(file: File)
+
+  case class STOP_EMPTY_ORCHESTRATION(orchID: String)
+
+  def props: Props = {
+    Props(new FeyCore)
+  }
+
+  final val WATCHER_NAME: String = "DIR_WATCHER"
+  final val IDENTIFIER_NAME: String = "FEY_IDENTIFIER"
+
+  /**
+    * Loads the specification for validating a Fey JSON
+    */
+  val jsonSchemaSpec: SchemaType = {
+    Json.fromJson[SchemaType](Json.parse(scala.io.Source
+      .fromInputStream(getClass.getResourceAsStream("/fey-json-schema-validator.json"))
+      .getLines()
+      .mkString(""))).get
+  }
+}
+
+private object FEY_CACHE{
+  /**
+    * Keeps track of all active orchestrations
+    * [OrchestrationID, (Orchestration Timestamp, Orchestration ActorRef)]
+    */
+  val activeOrchestrations:HashMap[String, (String, ActorRef)] = HashMap.empty[String, (String, ActorRef)]
+
+  /**
+    * Keeps a list of the orchestrations that are waiting for termination so then can be restarted
+    * Used mainly inside the Terminated
+    */
+  val orchestrationsAwaitingTermination:HashMap[String,OrchestrationInformation] = HashMap.empty[String,OrchestrationInformation]
+}
+
+sealed case class OrchestrationInformation(ensembleSpecJson: List[JsObject], orchestrationID: String,
+                                     orchestrationName: String, orchestrationTimestamp: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
new file mode 100644
index 0000000..c2344d8
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
+
+/**
+  * Defines the generic actor for Fey integration
+  * @param params Map of key value pairs that will be used to configure the actor
+  * @param backoff Backoff interval
+  * @param connectTo List of ActorRef that are connected to this actor
+  *                  (If this actor is A, and it is connect to B and C, so the network would be A -> B,C)
+  * @param schedulerTimeInterval When this value is different of zero,
+  *                              a scheduler will be started calling the execute method
+  * @param orchestrationName
+  * @param orchestrationID
+  */
+abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
+                               val backoff: FiniteDuration = 1.minutes,
+                               val connectTo: Map[String,ActorRef] = Map.empty,
+                               val schedulerTimeInterval: FiniteDuration = 2.seconds,
+                               val orchestrationName: String = "",
+                               val orchestrationID: String = "",
+                               val autoScale: Boolean = false)
+
+  extends Actor with ActorLogging{
+
+  import FeyGenericActor._
+
+  /**
+    * Keeps reference to the cancellable
+    */
+  @volatile private var scheduler: Cancellable = null
+  @volatile private var endBackoff: Long = 0
+
+  override final def receive: Receive = {
+
+    case PRINT_PATH =>
+      log.info(s"** ${self.path} **")
+
+    case STOP =>
+      context.stop(self)
+
+    case PROCESS(message) =>
+      if(System.nanoTime() >= endBackoff) {
+        processMessage(message, sender())
+      }
+    // In case
+    case EXCEPTION(reason) => throw reason
+    //Not treated messages will be pass over to the receiveComplement
+    case x => customReceive(x)
+  }
+
+  override final def preRestart(reason: Throwable, message: Option[Any]): Unit = {
+    context.children foreach { child =>
+      context.unwatch(child)
+      context.stop(child)
+    }
+    postStop()
+  }
+
+  override final def preStart() = {
+    onStart()
+    startScheduler()
+  }
+
+  override final def postStop() = {
+    log.info(s"STOPPED actor ${self.path.name}")
+    stopScheduler()
+    onStop()
+  }
+
+  override final def postRestart(reason: Throwable) = {
+    log.info(s"RESTARTED Actor ${self.path.name}")
+    preStart()
+    onRestart(reason)
+  }
+
+  def onRestart(reason: Throwable) = {
+    log.info("RESTARTED method")
+  }
+
+  /**
+    * Stops the scheduler
+    */
+  private final def stopScheduler() = {
+    if (scheduler != null) {
+      scheduler.cancel()
+      scheduler = null
+    }
+  }
+  /**
+    * Enables the backoff.
+    * Actor will drop the PROCESS messages that are sent during the backoff period time.
+    */
+  final def startBackoff() = {
+    this.endBackoff = System.nanoTime() + this.backoff.toNanos
+  }
+
+  /**
+    * start the sheduled task after 1 second
+    * The time interval to be used is the one passed to the constructor
+    */
+  private final def startScheduler() = {
+    if(scheduler == null && schedulerTimeInterval.toNanos != 0){
+      scheduler = context.system.scheduler.schedule(1.seconds, schedulerTimeInterval){
+        try{
+          execute()
+        }catch{
+          case e: Exception => self ! EXCEPTION(e)
+        }
+      }
+    }
+  }
+
+  /**
+    * Called by the scheduler.
+    */
+  def execute() = {
+    log.info(s"Executing action in ${self.path.name}")
+  }
+
+  /**
+    * Called every time actors receives the PROCESS message.
+    * The default implementation propagates the message to the connected actors
+    * and fires up the backoff
+    * @param message message to be processed
+    * @tparam T Any
+    */
+  def processMessage[T](message: T, sender: ActorRef): Unit = {
+    log.info(s"Processing message ${message.toString}")
+    propagateMessage(s"PROPAGATING FROM ${self.path.name} - Message: ${message.toString}")
+    startBackoff()
+  }
+
+  /**
+    * This method should be called to propagate the message
+    * to the actors that are linked.
+    * Call this method in the end of the method processMessage
+    *
+    * @param message message to be propagated
+    * @tparam T Any
+    */
+  final def propagateMessage[T](message: T) = {
+    connectTo.foreach(linkedActor => {
+      linkedActor._2 ! PROCESS(message)
+    })
+  }
+
+  /**
+    * Method called after actor has received the START message.
+    * All the necessary configurations will be ready to be used.
+    */
+  def onStart(): Unit = {
+    log.info(s"Actor ${self.path.name} started.")
+  }
+
+  /**
+    * Method called after actor has been stopped.
+    * Any scheduler that might have been running will be already canceled.
+    */
+  def onStop(): Unit = {
+    log.info(s"Actor ${self.path.name} stopped.")
+  }
+
+  /**
+    * Any message that was not processed by the default actor receiver
+    * will be passed to this method.
+    *
+    * @return
+    */
+  def customReceive: Receive
+}
+
+object FeyGenericActor {
+
+  /**
+    * Stops the actor
+   */
+  case object STOP
+
+  /**
+    * Default message to execution an action when it is received.
+    *
+    * @param message message to be processed
+    * @tparam T Any
+    */
+  case class PROCESS[T](message: T)
+
+  /**
+    * Message sent to the actor when an exception happens in the scheduler
+    *
+    * @param reason
+    */
+  case class EXCEPTION(reason: Throwable)
+
+  /**
+    * Prints the path of the actor
+    */
+  case object PRINT_PATH
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/10042354/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala b/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
new file mode 100644
index 0000000..1dbb5e6
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{Actor, ActorIdentity, ActorLogging, ActorPath, Identify}
+import play.api.libs.json._
+
+import scala.collection.mutable.HashSet
+
+protected class IdentifyFeyActors extends Actor with ActorLogging {
+
+  import IdentifyFeyActors._
+
+  def receive = {
+    case IDENTIFY_TREE(startPath) =>
+      log.info("Current Actors in system:")
+      actorsPath = HashSet.empty
+      rootPath = startPath
+      log.info(startPath)
+      self ! ActorPath.fromString(startPath)
+
+    case path: ActorPath =>
+      context.actorSelection(path / "*") ! Identify(())
+
+    case ActorIdentity(_, Some(ref)) =>
+      actorsPath.add(ref.path.toString)
+      log.info(ref.path.toString)
+      self ! ref.path
+
+    case _ =>
+  }
+}
+
+protected object IdentifyFeyActors{
+
+  /**
+    * Fires the identity event to all of the actors
+    *
+    * @param startPath
+    */
+  case class IDENTIFY_TREE(startPath: String)
+
+  /**
+    * Path of the root that fired the identity event
+    */
+  private var rootPath: String = ""
+
+  /**
+    * Creates a Set of all actors path that responded to the Identity
+   */
+  var actorsPath:HashSet[String] = HashSet.empty
+
+  /**
+    * Generates a Tree in JSON format.
+    * Uses a Trie as data structure
+    * @return string JSON
+    */
+  def generateTreeJson(): String = {
+    val trie = new Trie()
+    actorsPath.map(_.replace("user/","")).foreach(trie.appendPath(_))
+
+    Json.stringify(trie.print)
+  }
+
+  //Static HTML content from d3
+  val html = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/d3Tree.html"), "UTF-8")
+    .getLines()
+    .mkString("\n")
+
+  def getHTMLTree(json: String): String = {
+   html.replace("$MYJSONHIERARCHY", json)
+  }
+
+}
+