You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/14 19:54:33 UTC

[4/9] incubator-iota git commit: Dynamically download jars

Dynamically download jars

- Restructure how to receive a JSON
- Json analysis will be done when a new JSON is received, before Fey actually process it

Affected:

- Fey checkpoint
- Fey Json receiver
- Fey Json process


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/6fbba635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/6fbba635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/6fbba635

Branch: refs/heads/master
Commit: 6fbba6351663c3a65d69eaae1251226cbcbc7b2f
Parents: a2cc5b6
Author: Barbara Gomes <ba...@gmail.com>
Authored: Wed Jul 13 18:06:27 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Wed Jul 13 18:06:27 2016 -0700

----------------------------------------------------------------------
 .../apache/iota/fey/CheckpointProcessor.scala   |  81 ++++++++
 .../scala/org/apache/iota/fey/DirWatcher.scala  | 110 -----------
 .../scala/org/apache/iota/fey/Ensemble.scala    |  15 +-
 .../scala/org/apache/iota/fey/FeyCore.scala     | 164 ++++------------
 .../org/apache/iota/fey/JsonReceiver.scala      | 190 +++++++++++++++++++
 .../org/apache/iota/fey/JsonReceiverActor.scala |  87 +++++++++
 .../main/scala/org/apache/iota/fey/Utils.scala  |   9 +-
 .../apache/iota/fey/WatchServiceReceiver.scala  | 103 ++++++++++
 project/Build.scala                             |   3 +-
 project/Dependencies.scala                      |   4 +
 10 files changed, 521 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala b/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
new file mode 100644
index 0000000..987d9e6
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
@@ -0,0 +1,81 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.io.File
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.JsonReceiverActor.JSON_RECEIVED
+import play.api.libs.json.{JsValue, Json}
+
+import scala.io.Source
+
+/**
+  * Altough checkpoint processor is not a receiver, it will use the same principle
+  * as a receiver.
+  * It will run just once, when the application starts.
+  * @param receiverActor
+  */
+class CheckpointProcessor(receiverActor: ActorRef) extends JsonReceiver{
+
+  override def run(): Unit = {
+    processCheckpointFiles()
+  }
+
+  def getJsonObject(params: String): Option[JsValue] = {
+    try{
+      val stringJson = Source.fromFile(params).getLines.mkString
+      Option(Json.parse(stringJson))
+    }catch{
+      case e: Exception =>
+        log.error("Could not parse JSON", e)
+        None
+    }
+  }
+
+  private def processJson(path: String, file: File) = {
+    try{
+      getJsonObject(path) match {
+        case Some(orchestrationJSON) =>
+          val valid = validJson(orchestrationJSON)
+          if(valid && (orchestrationJSON \ JSON_PATH.COMMAND).as[String].toUpperCase != "DELETE"){
+            checkForLocation(orchestrationJSON)
+          }
+          if(valid) {
+            receiverActor ! JSON_RECEIVED(orchestrationJSON, file)
+          }else{
+            log.warn(s"File $path not processed. Incorrect JSON schema")
+          }
+          file.delete()
+        case None =>
+      }
+    } catch {
+      case e: Exception =>
+        log.error(s"File $path will not be processed", e)
+    }
+  }
+
+  private def processCheckpointFiles() = {
+    Utils.getFilesInDirectory(CONFIG.CHECKPOINT_DIR)
+      .filter(file => file.getName.endsWith(CONFIG.JSON_EXTENSION))
+      .foreach(file => {
+        processJson(file.getAbsolutePath, file)
+      })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
deleted file mode 100644
index cf2b685..0000000
--- a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iota.fey
-
-import java.io.File
-import java.nio.file.StandardWatchEventKinds._
-import java.nio.file.{FileSystems, Path}
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-
-/**
-  *
-  * @param watcherActor
-  */
-class WatchDirectoryTask(watcherActor: ActorRef) extends Runnable {
-
-  private val watchService = FileSystems.getDefault.newWatchService()
-
-  def watch(path: Path) : Unit = path.register(watchService, ENTRY_CREATE, ENTRY_MODIFY)
-
-  override def run() {
-    try {
-      while (!Thread.currentThread().isInterrupted) {
-        val key = watchService.take()
-        val eventsIterator = key.pollEvents().iterator()
-        while(eventsIterator.hasNext) {
-          val event = eventsIterator.next()
-          val relativePath = event.context().asInstanceOf[Path]
-          val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
-          event.kind() match {
-            case ENTRY_CREATE =>
-              watcherActor ! DirectoryWatcherActor.FILE_EVENT(path.toFile, "CREATED")
-            case ENTRY_MODIFY =>
-              watcherActor ! DirectoryWatcherActor.FILE_EVENT(path.toFile, "UPDATED")
-            case x => println("UNDEFINED MESSAGE")
-          }
-        }
-        key.reset()
-      }
-    } catch {
-      case e: InterruptedException =>
-        throw e
-    } finally {
-      watchService.close()
-    }
-  }
-}
-
-class DirectoryWatcherActor(val fileExtension: String) extends Actor with ActorLogging {
-
-  import DirectoryWatcherActor._
-
-  val watchFileTask = new WatchDirectoryTask(self)
-  var watchThread = new Thread(watchFileTask, "WatchService")
-
-  override def preStart() {
-    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
-    watchThread.setDaemon(true)
-    watchThread.start()
-  }
-
-  override def postStop() {
-    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
-    watchThread.interrupt()
-  }
-
-  override def postRestart(reason: Throwable): Unit = {
-    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
-    preStart()
-  }
-
-  override def receive: Receive = {
-    case MONITOR(path) =>
-      log.info(s"Start monitoring ${path.getFileName}")
-      watchFileTask.watch(path)
-    case FILE_EVENT(file, eventType) if file.getAbsolutePath.endsWith(fileExtension) =>
-      log.info(s"$eventType = ${file.getAbsolutePath}")
-      context.parent ! FeyCore.NEW_FILE_ACTION(file)
-  }
-}
-
-object DirectoryWatcherActor {
-
-  /**
-    * Start monitoring directory
-    *
-    * @param path directory path
-    */
-  case class MONITOR(path: Path)
-  case class FILE_EVENT(file: File, event: String)
-
-  def props(fileExtension: String): Props = {
-    Props(new DirectoryWatcherActor(fileExtension))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index caf2ecc..da3c0bb 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -201,7 +201,7 @@ protected class Ensemble(val orchestrationID: String,
     */
   private def getPerformer(performerInfo: Performer, connections: Map[String, ActorRef]): Props = {
 
-    val clazz = loadClazzFromJar(performerInfo.classPath, performerInfo.jarName)
+    val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
 
     val autoScale = if(performerInfo.autoScale > 0) true else false
 
@@ -220,15 +220,15 @@ protected class Ensemble(val orchestrationID: String,
     * Load a clazz instance of FeyGenericActor from a jar
     *
     * @param classPath class path
-    * @param jarName jar name
+    * @param jarLocation Full path where to load the jar from
     * @return clazz instance of FeyGenericActor
     */
-  private def loadClazzFromJar(classPath: String, jarName: String):Class[FeyGenericActor] = {
+  private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor] = {
     try {
-      Utils.loadActorClassFromJar(s"${CONFIG.JAR_REPOSITORY}/$jarName",classPath)
+      Utils.loadActorClassFromJar(jarLocation,classPath,jarName)
     }catch {
       case e: Exception =>
-        log.error(e,s"Could not load class $classPath from jar $jarName. Please, check the Jar repository path as well the jar name")
+        log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check the Jar repository path as well the jar name")
         throw e
     }
   }
@@ -280,7 +280,8 @@ object Ensemble {
       val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
       val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
       val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \ CONTROL_AWARE).as[Boolean] else false
-      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware))
+      val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
+      (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware, location))
     }).toMap
   }
 
@@ -316,4 +317,4 @@ object Ensemble {
 case class Performer(uid: String, jarName: String,
                 classPath: String, parameters: Map[String,String],
                 schedule: FiniteDuration, backoff: FiniteDuration,
-                autoScale: Int, controlAware: Boolean)
+                autoScale: Int, controlAware: Boolean, jarLocation: String)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 5985f26..41ba202 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -37,8 +38,8 @@ protected class FeyCore extends Actor with ActorLogging{
   import FeyCore._
   import CONFIG._
 
-  var watcherActor: ActorRef = null
-  var identifier: ActorRef = null
+  val identifier: ActorRef = context.actorOf(Props(classOf[IdentifyFeyActors]), name = IDENTIFIER_NAME)
+  context.watch(identifier)
 
   override def receive: Receive = {
 
@@ -46,14 +47,13 @@ protected class FeyCore extends Actor with ActorLogging{
       printActiveActors()
 
     case START =>
-      createIdentifierActor()
-      processInitialFiles(JSON_REPOSITORY)
-      self ! WATCH_DIR(JSON_REPOSITORY)
+      val jsonReceiverActor: ActorRef = context.actorOf(Props[JsonReceiverActor], name = JSON_RECEIVER_NAME)
+      context.watch(jsonReceiverActor)
 
-    case NEW_FILE_ACTION(file) =>
+    case ORCHESTRATION_RECEIVED(orchestrationJson, file) =>
       log.info(s"NEW FILE ${file.getAbsolutePath}")
       try{
-        processJson(file)
+        processJson(orchestrationJson)
         renameProcessedFile(file, "processed")
       }catch {
         case e: Exception =>
@@ -61,33 +61,11 @@ protected class FeyCore extends Actor with ActorLogging{
           log.error(e, s"JSON not processed ${file.getAbsolutePath}")
       }
 
-    case WATCH_DIR(path) =>
-      if(watcherActor == null) {
-        watcherActor = context.actorOf(DirectoryWatcherActor.props(JSON_EXTENSION), name = WATCHER_NAME)
-        context.watch(watcherActor)
-
-      }
-      watcherActor ! DirectoryWatcherActor.MONITOR(Paths.get(path))
-
     case STOP_EMPTY_ORCHESTRATION(orchID) =>
       log.warning(s"Deleting Empty Orchestration $orchID")
       deleteOrchestration(orchID)
 
-    case Terminated(actor) =>
-      SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
-      actor.path.name match {
-        case IDENTIFIER_NAME =>
-          createIdentifierActor()
-        case WATCHER_NAME =>
-          watcherActor = null
-          self ! WATCH_DIR(JSON_REPOSITORY)
-        case guid: String =>
-          log.info(s"TERMINATED ${guid}")
-          FEY_CACHE.activeOrchestrations.remove(guid)
-          if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
-            checkForOrchestrationWaitingForTermination(guid)
-          }
-      }
+    case Terminated(actor) => processTerminatedMessage(actor)
 
     case GetRoutees => //Discard
 
@@ -96,25 +74,33 @@ protected class FeyCore extends Actor with ActorLogging{
 
   }
 
+  private def processTerminatedMessage(actorRef: ActorRef) = {
+    SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp)
+    log.info(s"TERMINATED ${actorRef.path.name}")
+    FEY_CACHE.activeOrchestrations.remove(actorRef.path.name)
+    if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
+      checkForOrchestrationWaitingForTermination(actorRef.path.name)
+    }
+  }
+
   /**
     * Clean up Fey Cache
     */
-  override def postStop() = {
+  override def postStop(): Unit = {
     SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
     FEY_CACHE.activeOrchestrations.clear()
     FEY_CACHE.orchestrationsAwaitingTermination.clear()
     ORCHESTRATION_CACHE.orchestration_metadata.clear()
   }
 
-  override def preStart() = {
+  override def preStart(): Unit = {
     SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
     log.info("Starting Fey Core")
-    if (CHEKPOINT_ENABLED) {
-      processInitialFiles(CHECKPOINT_DIR, true)
-    }
     self ! START
   }
 
+
+
   override def postRestart(reason: Throwable): Unit = {
     SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
     preStart()
@@ -123,45 +109,12 @@ protected class FeyCore extends Actor with ActorLogging{
   override val supervisorStrategy =
     OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
       case _: Exception =>
-        if(sender() == watcherActor) Stop
-        else Restart
+        Restart
     }
 
   /**
-    * Actor that prints all the actors in the fey core tree
-    */
-  private def createIdentifierActor() = {
-      identifier = context.actorOf(Props(classOf[IdentifyFeyActors]), name = IDENTIFIER_NAME)
-  }
-
-  /**
-    * Process all the files that are already in the dir
-    * before starting watching for new files
-    */
-  private def processInitialFiles(directory: String, delete: Boolean = false) = {
-    getFilesInDirectory(directory)
-      .filter(file => file.getName.endsWith(JSON_EXTENSION))
-      .foreach(file => {
-        try {
-          processJson(file)
-          if(delete){
-            file.delete()
-          }else {
-            renameProcessedFile(file, "processed")
-          }
-        } catch {
-          case e: Exception =>
-            renameProcessedFile(file, "failed")
-            log.error(e, s"JSON not processed ${file.getAbsolutePath}")
-        }
-      })
-  }
-
-  /**
-    * Process the JSON in the file.
     * Process the JSON is a binary operation.
     * The network only will be established if the entire JSON can be processed.
-    * Throws IllegalArgumentException if json cannot be parsed.
     * JSON commands:
     *   CREATE: tells Fey that there is no previous orchestration active for this JSON.
     *           Fey will create the orchestration and all the Ensembles in the JSON.
@@ -173,49 +126,20 @@ protected class FeyCore extends Actor with ActorLogging{
     *   RECREATE: Tells Fey that might exists an active orchestration, if that is the case, delete the orchestration and recreate it
     *             otherwise, simply create it.
     *
-    * @param file
+    * @param orchestrationJSON
     */
-  private def processJson(file: File): Unit ={
-    log.info(s"File: ${file.getAbsolutePath}")
-    loadJsonFromFile(file) match {
-      case Some(json) =>
-        if(validJSONSchema(json)) {
-          val orchestrationName = (json \ ORCHESTRATION_NAME).as[String]
-          val orchestrationID = (json \ GUID).as[String]
-          val orchestrationCommand = (json \ COMMAND).as[String].toUpperCase()
-          val orchestrationTimestamp = (json \ ORCHESTRATION_TIMESTAMP).as[String]
-          val ensembles = (json \ ENSEMBLES).as[List[JsObject]]
-          orchestrationCommand match {
-            case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
-            case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
-            case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
-            case "DELETE" => deleteOrchestration(orchestrationID)
-            case x => throw new CommandNotRecognized(s"Command: $x")
-          }
-        }
-      case None =>
-        throw new IllegalArgumentException(s"Could not parser the JSON in the file ${file.getAbsolutePath}")
-    }
-  }
-
-  def validJSONSchema(json: JsValue):Boolean = {
-    try {
-      val result = SchemaValidator.validate(jsonSchemaSpec, json)
-      if (result.isError) {
-        log.error("Incorrect JSON schema")
-        log.error(result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
-          val path = (error \ "instancePath").as[String]
-          val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
-          s"$path \n\tErrors: $msg"
-        }).mkString("\n"))
-        false
-      } else {
-        true
-      }
-    }catch{
-      case e: Exception =>
-        log.error(e,"Error while validating JSON")
-        false
+  private def processJson(orchestrationJSON: JsValue): Unit ={
+    val orchestrationName = (orchestrationJSON \ ORCHESTRATION_NAME).as[String]
+    val orchestrationID = (orchestrationJSON \ GUID).as[String]
+    val orchestrationCommand = (orchestrationJSON \ COMMAND).as[String].toUpperCase()
+    val orchestrationTimestamp = (orchestrationJSON \ ORCHESTRATION_TIMESTAMP).as[String]
+    val ensembles = (orchestrationJSON \ ENSEMBLES).as[List[JsObject]]
+    orchestrationCommand match {
+      case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+      case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+      case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+      case "DELETE" => deleteOrchestration(orchestrationID)
+      case x => throw new CommandNotRecognized(s"Command: $x")
     }
   }
 
@@ -372,23 +296,17 @@ protected object FeyCore{
   case object JSON_TREE
 
   /**
-    * Send this message to Start Directory Watcher Thread
-    *
-    * @param path
-    */
-  sealed case class WATCH_DIR(path: String)
-
-  /**
     * After creating an actorOf FeyCore send this message to configure.
     */
   case object START
 
   /**
-    * Used by the DirectoryWatcher to notify fey when a new file was added
-    *
-    * @param file java.io.File
+    * Json Receiver actor will send this message everytime a json is received
+    * Does not matter from where it was received
+    * @param json
+    * @param file
     */
-  case class NEW_FILE_ACTION(file: File)
+  case class ORCHESTRATION_RECEIVED(json: JsValue, file: File)
 
   case class STOP_EMPTY_ORCHESTRATION(orchID: String)
 
@@ -396,7 +314,7 @@ protected object FeyCore{
     Props(new FeyCore)
   }
 
-  final val WATCHER_NAME: String = "DIR_WATCHER"
+  final val JSON_RECEIVER_NAME: String = "JSON_RECEIVER"
   final val IDENTIFIER_NAME: String = "FEY_IDENTIFIER"
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
new file mode 100644
index 0000000..cc14a81
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
@@ -0,0 +1,190 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.io.FileOutputStream
+import java.net.URL
+import java.io.File
+
+import com.eclipsesource.schema._
+import org.slf4j.LoggerFactory
+import play.api.libs.json._
+import JSON_PATH._
+import java.nio.file.{Files, Paths}
+
+import org.apache.commons.io.IOUtils
+import org.apache.commons.codec.binary.Base64
+
+/**
+  * Basic class to be used when implementing a new JSON receiver
+  */
+trait JsonReceiver extends Runnable{
+
+  val log = LoggerFactory.getLogger(this.getClass)
+
+  /**
+    * Default Run if no one is specified on concret class
+    */
+  override def run(): Unit = {
+    try {
+      while (!Thread.currentThread().isInterrupted) {
+        execute()
+      }
+    }catch{
+      case e: Exception => exceptionOnRun(e)
+    }
+  }
+
+  /**
+    * Checks if JSON complies with defined Schema
+    *
+    * @param json
+    * @return true if it complies or false if it does not
+    */
+  final def validJson(json: JsValue): Boolean = {
+    try {
+      val result = SchemaValidator.validate(CONFIG.JSON_SPEC, json)
+      if (result.isError) {
+        log.error("Incorrect JSON schema \n" + result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
+          val path = (error \ "instancePath").as[String]
+          val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
+          s"$path \n\tErrors: $msg"
+        }).mkString("\n"))
+        false
+      } else {
+        true
+      }
+    }catch{
+      case e: Exception =>
+        log.error("Error while validating JSON", e)
+        false
+    }
+  }
+
+  /**
+    * Checks if any of the performers need to have its jar downloaded
+    * All the Receivers must call this method so the Jars can be downloaded at runtime
+    *
+    * @param json Orchestration JSON object
+    */
+  final def checkForLocation(json: JsValue): Unit = {
+    (json \ ENSEMBLES).as[List[JsObject]].foreach(ensemble => {
+      (ensemble \ PERFORMERS).as[List[JsObject]].foreach(performer => {
+        if((performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION)){
+          val jarName = (performer \ SOURCE \ SOURCE_NAME).as[String]
+          val jarLocation = (performer \ SOURCE \ JAR_LOCATION).as[String].toLowerCase
+          if( (jarLocation.startsWith("https://") || jarLocation.startsWith("http://")) && !jarDownloaded(jarName)){
+            val jarLocation = (performer \ SOURCE \ JAR_LOCATION).as[String]
+            downloadJAR(jarLocation, jarName)
+          }
+        }
+      })
+    })
+  }
+
+  /**
+    * Checks if the jar already exists
+    *
+    * @param jarName
+    * @return
+    */
+  private final def jarDownloaded(jarName: String): Boolean = {
+    try {
+      Files.exists(Paths.get(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName"))
+    }catch{
+      case e: Exception =>
+        log.error(s"Could not check if $jarName exists", e)
+        true
+    }
+  }
+
+  private final def downloadJAR(url: String, jarName: String): Unit = {
+    var outputStream: FileOutputStream = null
+    try{
+      val extractedURL = extractCredentials(url)
+      log.info(s"Downloading $jarName from ${extractedURL._1}")
+
+      val connection = new URL(extractedURL._1).openConnection
+
+      // Add authentication Header if credentials is defined
+      extractedURL._2 match {
+        case Some(credentials) =>
+          connection.setRequestProperty(HttpBasicAuth.AUTHORIZATION, HttpBasicAuth.getHeader(credentials._1, credentials._2))
+        case None =>
+      }
+      outputStream = new FileOutputStream(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")
+      IOUtils.copy(connection.getInputStream,outputStream)
+      outputStream.close()
+    }catch{
+      case e: Exception =>
+        if(outputStream != null) {
+          outputStream.close()
+          (new File(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")).delete()
+        }
+        log.error(s"Could not download $jarName from $url", e)
+    }
+  }
+
+  /**
+    *
+    * @param url
+    * @return (NO_CRED_URL, (USER, PASSWORD))
+    */
+  private final def extractCredentials(url: String): (String, Option[(String, String)]) = {
+    if(url.contains("@")) {
+      val atIndex = url.indexOf("@")
+      if (url.startsWith("https")) {
+        val cred = url.substring(8, atIndex)
+        val userPass = cred.split(":")
+        (url.replace(s"$cred@",""), Option(userPass(0),userPass(1)))
+      } else {
+        val cred = url.substring(7, atIndex)
+        val userPass = cred.split(":")
+        (url.replace(s"$cred@",""), Option(userPass(0),userPass(1)))
+      }
+    }else{
+      (url, None)
+    }
+  }
+
+  /**
+    * Called inside run method
+    */
+  def execute(): Unit = {}
+
+  /**
+    * Called when occurs an exception inside Run.
+    * For example: Thread.interrupt
+    *
+    * @param e
+    */
+  def exceptionOnRun(e: Exception): Unit = {}
+}
+
+object HttpBasicAuth {
+  val BASIC = "Basic";
+  val AUTHORIZATION = "Authorization";
+
+  def encodeCredentials(username: String, password: String): String = {
+    new String(Base64.encodeBase64String((username + ":" + password).getBytes))
+  }
+
+  def getHeader(username: String, password: String): String =
+    BASIC + " " + encodeCredentials(username, password)
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
new file mode 100644
index 0000000..2a05710
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
@@ -0,0 +1,87 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.nio.file.Paths
+import java.io.File
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+import play.api.libs.json.{JsValue, Json}
+
+class JsonReceiverActor extends Actor with ActorLogging {
+
+  import JsonReceiverActor._
+
+  val watchFileTask = new WatchServiceReceiver(self)
+  var watchThread = new Thread(watchFileTask, "WatchService")
+
+  override def preStart() {
+    prepareDynamicJarRepo()
+    processCheckpointFiles()
+
+    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
+    watchThread.setDaemon(true)
+    watchThread.start()
+
+    watchFileTask.watch(Paths.get(CONFIG.JSON_REPOSITORY))
+  }
+
+  private def prepareDynamicJarRepo() = {
+    val jarDir = new File(CONFIG.DYNAMIC_JAR_REPO)
+    if (!jarDir.exists()){
+      jarDir.mkdir()
+    }else if(CONFIG.DYNAMIC_JAR_FORCE_PULL){
+      jarDir.listFiles().foreach(_.delete())
+    }
+  }
+
+
+  private def processCheckpointFiles() = {
+    if (CONFIG.CHEKPOINT_ENABLED) {
+      val checkpoint = new CheckpointProcessor(self)
+      checkpoint.run()
+    }
+  }
+
+  override def postStop() {
+    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
+    watchThread.interrupt()
+    watchThread.join()
+  }
+
+  override def postRestart(reason: Throwable): Unit = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
+    preStart()
+  }
+
+  override def receive: Receive = {
+    case JSON_RECEIVED(json, file) =>
+      log.info(s"JSON RECEIVED => ${Json.stringify(json)}")
+      context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, file)
+
+    case _ =>
+  }
+
+}
+
+object JsonReceiverActor {
+
+  case class JSON_RECEIVED(json: JsValue, file: File)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 58b98d6..ebf7503 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -67,16 +67,17 @@ protected object Utils {
     * @param className class path inside the jar
     * @return class of FeyGenericActor
     */
-  def loadActorClassFromJar(path: String, className: String):Class[FeyGenericActor] = {
+  def loadActorClassFromJar(path: String, className: String, jarName: String):Class[FeyGenericActor] = {
 
-    loadedJars.get(path) match {
+    loadedJars.get(jarName) match {
 
       case None =>
         val urls:Array[URL] = Array(new URL(s"jar:file:$path!/"))
         val cl: URLClassLoader = URLClassLoader.newInstance(urls)
         val clazz = cl.loadClass(className)
         val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]]
-        loadedJars.put(path, (cl, Map(className -> feyClazz)))
+        log.info(s"$path -> $className")
+        loadedJars.put(jarName, (cl, Map(className -> feyClazz)))
         feyClazz
 
       case Some(loadedJar) =>
@@ -84,7 +85,7 @@ protected object Utils {
           case None =>
             val clazz = loadedJar._1.loadClass(className)
             val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]]
-            loadedJars.put(path, (loadedJar._1, Map(className -> feyClazz) ++ loadedJar._2))
+            loadedJars.put(jarName, (loadedJar._1, Map(className -> feyClazz) ++ loadedJar._2))
             feyClazz
           case Some(clazz) =>
             clazz

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
new file mode 100644
index 0000000..e42bcbb
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
@@ -0,0 +1,103 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.nio.file.StandardWatchEventKinds._
+import java.nio.file.{FileSystems, Path}
+import java.io.File
+import akka.actor.ActorRef
+import org.apache.iota.fey.JsonReceiverActor.JSON_RECEIVED
+import play.api.libs.json._
+
+import scala.io.Source
+
+class WatchServiceReceiver(receiverActor: ActorRef) extends JsonReceiver{
+
+  private val watchService = FileSystems.getDefault.newWatchService()
+
+  def watch(path: Path) : Unit = path.register(watchService, ENTRY_CREATE, ENTRY_MODIFY)
+
+  def getJsonObject(params: String): Option[JsValue] = {
+    try{
+      val stringJson = Source.fromFile(params).getLines.mkString
+      Option(Json.parse(stringJson))
+    }catch{
+      case e: Exception =>
+        log.error("Could not parse JSON", e)
+        None
+    }
+  }
+
+  override def execute(): Unit = {
+    processInitialFiles()
+
+    val key = watchService.take()
+    val eventsIterator = key.pollEvents().iterator()
+
+    while(eventsIterator.hasNext) {
+      val event = eventsIterator.next()
+      val relativePath = event.context().asInstanceOf[Path]
+      val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
+
+      event.kind() match {
+        case (ENTRY_CREATE | ENTRY_MODIFY) if path.toString.endsWith(CONFIG.JSON_EXTENSION) =>
+          processJson(path.toString, path.toFile)
+        case _ =>
+      }
+    }
+
+    key.reset()
+  }
+
+  private def processJson(path: String, file: File) = {
+    try{
+      getJsonObject(path) match {
+        case Some(orchestrationJSON) =>
+          val valid = validJson(orchestrationJSON)
+          if(valid && (orchestrationJSON \ JSON_PATH.COMMAND).as[String].toUpperCase != "DELETE"){
+            checkForLocation(orchestrationJSON)
+          }
+          if(valid) {
+            receiverActor ! JSON_RECEIVED(orchestrationJSON, file)
+          }else{
+            log.warn(s"File $path not processed. Incorrect JSON schema")
+          }
+        case None =>
+      }
+    } catch {
+      case e: Exception =>
+        log.error(s"File $path will not be processed", e)
+    }
+  }
+
+  private def processInitialFiles() = {
+    Utils.getFilesInDirectory(CONFIG.JSON_REPOSITORY)
+      .filter(file => file.getName.endsWith(CONFIG.JSON_EXTENSION))
+      .foreach(file => {
+        processJson(file.getAbsolutePath, file)
+      })
+  }
+
+  override def exceptionOnRun(e: Exception): Unit = {
+    log.error("Watch Service stopped", e)
+    watchService.close()
+    throw e
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 504fddb..3dc2706 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -21,7 +21,8 @@ import sbt.Keys._
 object ModuleDependencies {
 
   import Dependencies._
-  val FeyDependencies           = compile(akka_actor,typesafe_config,playJson,slf4j,log4jbind,sprayCan,sprayRouting,jsonValidator,javaFilter)
+  val FeyDependencies           = compile(akka_actor,typesafe_config,playJson,slf4j,log4jbind,sprayCan,
+                                            sprayRouting,jsonValidator,javaFilter,codec,apacheIO)
   val StreamDependencies        = provided(akka_actor, fey)
   val ZMQDependencies           = provided(akka_actor,  fey) ++ compile(zmq)
   val VirtualSensorDependencies = provided(akka_actor,  fey) ++ compile(math3)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 72567a7..e46381f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -26,6 +26,8 @@ object Dependencies {
   val fey             = "org.apache.iota"     %% "fey-core"                   % "1.0-SNAPSHOT"
   val zmq             = "org.zeromq"          %  "jeromq"                     % "0.3.5"
   val math3           = "org.apache.commons"  %  "commons-math3"              % "3.2"
+  val codec           = "commons-codec"       % "commons-codec"               % "1.10"
+  val apacheIO        = "commons-io"          % "commons-io"                  % "2.4"
 
   val akka_actor      = "com.typesafe.akka"   %% "akka-actor"                 % "2.4.2"
   val typesafe_config = "com.typesafe"        %  "config"                     % "1.3.0"
@@ -41,4 +43,6 @@ object Dependencies {
   //restapi
   val sprayCan        = "io.spray"            %%  "spray-can"                 % "1.3.3"
   val sprayRouting    = "io.spray"            %%  "spray-routing"             % "1.3.3"
+
+
 }