You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/14 19:54:33 UTC
[4/9] incubator-iota git commit: Dynamically download jars
Dynamically download jars
- Restructure how to receive a JSON
- Json analysis will be done when a new JSON is received, before Fey actually process it
Affected:
- Fey checkpoint
- Fey Json receiver
- Fey Json process
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/6fbba635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/6fbba635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/6fbba635
Branch: refs/heads/master
Commit: 6fbba6351663c3a65d69eaae1251226cbcbc7b2f
Parents: a2cc5b6
Author: Barbara Gomes <ba...@gmail.com>
Authored: Wed Jul 13 18:06:27 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Wed Jul 13 18:06:27 2016 -0700
----------------------------------------------------------------------
.../apache/iota/fey/CheckpointProcessor.scala | 81 ++++++++
.../scala/org/apache/iota/fey/DirWatcher.scala | 110 -----------
.../scala/org/apache/iota/fey/Ensemble.scala | 15 +-
.../scala/org/apache/iota/fey/FeyCore.scala | 164 ++++------------
.../org/apache/iota/fey/JsonReceiver.scala | 190 +++++++++++++++++++
.../org/apache/iota/fey/JsonReceiverActor.scala | 87 +++++++++
.../main/scala/org/apache/iota/fey/Utils.scala | 9 +-
.../apache/iota/fey/WatchServiceReceiver.scala | 103 ++++++++++
project/Build.scala | 3 +-
project/Dependencies.scala | 4 +
10 files changed, 521 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala b/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
new file mode 100644
index 0000000..987d9e6
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
@@ -0,0 +1,81 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.io.File
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.JsonReceiverActor.JSON_RECEIVED
+import play.api.libs.json.{JsValue, Json}
+
+import scala.io.Source
+
+/**
+ * Altough checkpoint processor is not a receiver, it will use the same principle
+ * as a receiver.
+ * It will run just once, when the application starts.
+ * @param receiverActor
+ */
+class CheckpointProcessor(receiverActor: ActorRef) extends JsonReceiver{
+
+ override def run(): Unit = {
+ processCheckpointFiles()
+ }
+
+ def getJsonObject(params: String): Option[JsValue] = {
+ try{
+ val stringJson = Source.fromFile(params).getLines.mkString
+ Option(Json.parse(stringJson))
+ }catch{
+ case e: Exception =>
+ log.error("Could not parse JSON", e)
+ None
+ }
+ }
+
+ private def processJson(path: String, file: File) = {
+ try{
+ getJsonObject(path) match {
+ case Some(orchestrationJSON) =>
+ val valid = validJson(orchestrationJSON)
+ if(valid && (orchestrationJSON \ JSON_PATH.COMMAND).as[String].toUpperCase != "DELETE"){
+ checkForLocation(orchestrationJSON)
+ }
+ if(valid) {
+ receiverActor ! JSON_RECEIVED(orchestrationJSON, file)
+ }else{
+ log.warn(s"File $path not processed. Incorrect JSON schema")
+ }
+ file.delete()
+ case None =>
+ }
+ } catch {
+ case e: Exception =>
+ log.error(s"File $path will not be processed", e)
+ }
+ }
+
+ private def processCheckpointFiles() = {
+ Utils.getFilesInDirectory(CONFIG.CHECKPOINT_DIR)
+ .filter(file => file.getName.endsWith(CONFIG.JSON_EXTENSION))
+ .foreach(file => {
+ processJson(file.getAbsolutePath, file)
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
deleted file mode 100644
index cf2b685..0000000
--- a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iota.fey
-
-import java.io.File
-import java.nio.file.StandardWatchEventKinds._
-import java.nio.file.{FileSystems, Path}
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-
-/**
- *
- * @param watcherActor
- */
-class WatchDirectoryTask(watcherActor: ActorRef) extends Runnable {
-
- private val watchService = FileSystems.getDefault.newWatchService()
-
- def watch(path: Path) : Unit = path.register(watchService, ENTRY_CREATE, ENTRY_MODIFY)
-
- override def run() {
- try {
- while (!Thread.currentThread().isInterrupted) {
- val key = watchService.take()
- val eventsIterator = key.pollEvents().iterator()
- while(eventsIterator.hasNext) {
- val event = eventsIterator.next()
- val relativePath = event.context().asInstanceOf[Path]
- val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
- event.kind() match {
- case ENTRY_CREATE =>
- watcherActor ! DirectoryWatcherActor.FILE_EVENT(path.toFile, "CREATED")
- case ENTRY_MODIFY =>
- watcherActor ! DirectoryWatcherActor.FILE_EVENT(path.toFile, "UPDATED")
- case x => println("UNDEFINED MESSAGE")
- }
- }
- key.reset()
- }
- } catch {
- case e: InterruptedException =>
- throw e
- } finally {
- watchService.close()
- }
- }
-}
-
-class DirectoryWatcherActor(val fileExtension: String) extends Actor with ActorLogging {
-
- import DirectoryWatcherActor._
-
- val watchFileTask = new WatchDirectoryTask(self)
- var watchThread = new Thread(watchFileTask, "WatchService")
-
- override def preStart() {
- SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
- watchThread.setDaemon(true)
- watchThread.start()
- }
-
- override def postStop() {
- SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
- watchThread.interrupt()
- }
-
- override def postRestart(reason: Throwable): Unit = {
- SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
- preStart()
- }
-
- override def receive: Receive = {
- case MONITOR(path) =>
- log.info(s"Start monitoring ${path.getFileName}")
- watchFileTask.watch(path)
- case FILE_EVENT(file, eventType) if file.getAbsolutePath.endsWith(fileExtension) =>
- log.info(s"$eventType = ${file.getAbsolutePath}")
- context.parent ! FeyCore.NEW_FILE_ACTION(file)
- }
-}
-
-object DirectoryWatcherActor {
-
- /**
- * Start monitoring directory
- *
- * @param path directory path
- */
- case class MONITOR(path: Path)
- case class FILE_EVENT(file: File, event: String)
-
- def props(fileExtension: String): Props = {
- Props(new DirectoryWatcherActor(fileExtension))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index caf2ecc..da3c0bb 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -201,7 +201,7 @@ protected class Ensemble(val orchestrationID: String,
*/
private def getPerformer(performerInfo: Performer, connections: Map[String, ActorRef]): Props = {
- val clazz = loadClazzFromJar(performerInfo.classPath, performerInfo.jarName)
+ val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
val autoScale = if(performerInfo.autoScale > 0) true else false
@@ -220,15 +220,15 @@ protected class Ensemble(val orchestrationID: String,
* Load a clazz instance of FeyGenericActor from a jar
*
* @param classPath class path
- * @param jarName jar name
+ * @param jarLocation Full path where to load the jar from
* @return clazz instance of FeyGenericActor
*/
- private def loadClazzFromJar(classPath: String, jarName: String):Class[FeyGenericActor] = {
+ private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor] = {
try {
- Utils.loadActorClassFromJar(s"${CONFIG.JAR_REPOSITORY}/$jarName",classPath)
+ Utils.loadActorClassFromJar(jarLocation,classPath,jarName)
}catch {
case e: Exception =>
- log.error(e,s"Could not load class $classPath from jar $jarName. Please, check the Jar repository path as well the jar name")
+ log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check the Jar repository path as well the jar name")
throw e
}
}
@@ -280,7 +280,8 @@ object Ensemble {
val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \ CONTROL_AWARE).as[Boolean] else false
- (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware))
+ val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
+ (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware, location))
}).toMap
}
@@ -316,4 +317,4 @@ object Ensemble {
case class Performer(uid: String, jarName: String,
classPath: String, parameters: Map[String,String],
schedule: FiniteDuration, backoff: FiniteDuration,
- autoScale: Int, controlAware: Boolean)
+ autoScale: Int, controlAware: Boolean, jarLocation: String)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 5985f26..41ba202 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -1,3 +1,4 @@
+
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -37,8 +38,8 @@ protected class FeyCore extends Actor with ActorLogging{
import FeyCore._
import CONFIG._
- var watcherActor: ActorRef = null
- var identifier: ActorRef = null
+ val identifier: ActorRef = context.actorOf(Props(classOf[IdentifyFeyActors]), name = IDENTIFIER_NAME)
+ context.watch(identifier)
override def receive: Receive = {
@@ -46,14 +47,13 @@ protected class FeyCore extends Actor with ActorLogging{
printActiveActors()
case START =>
- createIdentifierActor()
- processInitialFiles(JSON_REPOSITORY)
- self ! WATCH_DIR(JSON_REPOSITORY)
+ val jsonReceiverActor: ActorRef = context.actorOf(Props[JsonReceiverActor], name = JSON_RECEIVER_NAME)
+ context.watch(jsonReceiverActor)
- case NEW_FILE_ACTION(file) =>
+ case ORCHESTRATION_RECEIVED(orchestrationJson, file) =>
log.info(s"NEW FILE ${file.getAbsolutePath}")
try{
- processJson(file)
+ processJson(orchestrationJson)
renameProcessedFile(file, "processed")
}catch {
case e: Exception =>
@@ -61,33 +61,11 @@ protected class FeyCore extends Actor with ActorLogging{
log.error(e, s"JSON not processed ${file.getAbsolutePath}")
}
- case WATCH_DIR(path) =>
- if(watcherActor == null) {
- watcherActor = context.actorOf(DirectoryWatcherActor.props(JSON_EXTENSION), name = WATCHER_NAME)
- context.watch(watcherActor)
-
- }
- watcherActor ! DirectoryWatcherActor.MONITOR(Paths.get(path))
-
case STOP_EMPTY_ORCHESTRATION(orchID) =>
log.warning(s"Deleting Empty Orchestration $orchID")
deleteOrchestration(orchID)
- case Terminated(actor) =>
- SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
- actor.path.name match {
- case IDENTIFIER_NAME =>
- createIdentifierActor()
- case WATCHER_NAME =>
- watcherActor = null
- self ! WATCH_DIR(JSON_REPOSITORY)
- case guid: String =>
- log.info(s"TERMINATED ${guid}")
- FEY_CACHE.activeOrchestrations.remove(guid)
- if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
- checkForOrchestrationWaitingForTermination(guid)
- }
- }
+ case Terminated(actor) => processTerminatedMessage(actor)
case GetRoutees => //Discard
@@ -96,25 +74,33 @@ protected class FeyCore extends Actor with ActorLogging{
}
+ private def processTerminatedMessage(actorRef: ActorRef) = {
+ SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp)
+ log.info(s"TERMINATED ${actorRef.path.name}")
+ FEY_CACHE.activeOrchestrations.remove(actorRef.path.name)
+ if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) {
+ checkForOrchestrationWaitingForTermination(actorRef.path.name)
+ }
+ }
+
/**
* Clean up Fey Cache
*/
- override def postStop() = {
+ override def postStop(): Unit = {
SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
FEY_CACHE.activeOrchestrations.clear()
FEY_CACHE.orchestrationsAwaitingTermination.clear()
ORCHESTRATION_CACHE.orchestration_metadata.clear()
}
- override def preStart() = {
+ override def preStart(): Unit = {
SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
log.info("Starting Fey Core")
- if (CHEKPOINT_ENABLED) {
- processInitialFiles(CHECKPOINT_DIR, true)
- }
self ! START
}
+
+
override def postRestart(reason: Throwable): Unit = {
SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
preStart()
@@ -123,45 +109,12 @@ protected class FeyCore extends Actor with ActorLogging{
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
case _: Exception =>
- if(sender() == watcherActor) Stop
- else Restart
+ Restart
}
/**
- * Actor that prints all the actors in the fey core tree
- */
- private def createIdentifierActor() = {
- identifier = context.actorOf(Props(classOf[IdentifyFeyActors]), name = IDENTIFIER_NAME)
- }
-
- /**
- * Process all the files that are already in the dir
- * before starting watching for new files
- */
- private def processInitialFiles(directory: String, delete: Boolean = false) = {
- getFilesInDirectory(directory)
- .filter(file => file.getName.endsWith(JSON_EXTENSION))
- .foreach(file => {
- try {
- processJson(file)
- if(delete){
- file.delete()
- }else {
- renameProcessedFile(file, "processed")
- }
- } catch {
- case e: Exception =>
- renameProcessedFile(file, "failed")
- log.error(e, s"JSON not processed ${file.getAbsolutePath}")
- }
- })
- }
-
- /**
- * Process the JSON in the file.
* Process the JSON is a binary operation.
* The network only will be established if the entire JSON can be processed.
- * Throws IllegalArgumentException if json cannot be parsed.
* JSON commands:
* CREATE: tells Fey that there is no previous orchestration active for this JSON.
* Fey will create the orchestration and all the Ensembles in the JSON.
@@ -173,49 +126,20 @@ protected class FeyCore extends Actor with ActorLogging{
* RECREATE: Tells Fey that might exists an active orchestration, if that is the case, delete the orchestration and recreate it
* otherwise, simply create it.
*
- * @param file
+ * @param orchestrationJSON
*/
- private def processJson(file: File): Unit ={
- log.info(s"File: ${file.getAbsolutePath}")
- loadJsonFromFile(file) match {
- case Some(json) =>
- if(validJSONSchema(json)) {
- val orchestrationName = (json \ ORCHESTRATION_NAME).as[String]
- val orchestrationID = (json \ GUID).as[String]
- val orchestrationCommand = (json \ COMMAND).as[String].toUpperCase()
- val orchestrationTimestamp = (json \ ORCHESTRATION_TIMESTAMP).as[String]
- val ensembles = (json \ ENSEMBLES).as[List[JsObject]]
- orchestrationCommand match {
- case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
- case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
- case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
- case "DELETE" => deleteOrchestration(orchestrationID)
- case x => throw new CommandNotRecognized(s"Command: $x")
- }
- }
- case None =>
- throw new IllegalArgumentException(s"Could not parser the JSON in the file ${file.getAbsolutePath}")
- }
- }
-
- def validJSONSchema(json: JsValue):Boolean = {
- try {
- val result = SchemaValidator.validate(jsonSchemaSpec, json)
- if (result.isError) {
- log.error("Incorrect JSON schema")
- log.error(result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
- val path = (error \ "instancePath").as[String]
- val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
- s"$path \n\tErrors: $msg"
- }).mkString("\n"))
- false
- } else {
- true
- }
- }catch{
- case e: Exception =>
- log.error(e,"Error while validating JSON")
- false
+ private def processJson(orchestrationJSON: JsValue): Unit ={
+ val orchestrationName = (orchestrationJSON \ ORCHESTRATION_NAME).as[String]
+ val orchestrationID = (orchestrationJSON \ GUID).as[String]
+ val orchestrationCommand = (orchestrationJSON \ COMMAND).as[String].toUpperCase()
+ val orchestrationTimestamp = (orchestrationJSON \ ORCHESTRATION_TIMESTAMP).as[String]
+ val ensembles = (orchestrationJSON \ ENSEMBLES).as[List[JsObject]]
+ orchestrationCommand match {
+ case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+ case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+ case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp)
+ case "DELETE" => deleteOrchestration(orchestrationID)
+ case x => throw new CommandNotRecognized(s"Command: $x")
}
}
@@ -372,23 +296,17 @@ protected object FeyCore{
case object JSON_TREE
/**
- * Send this message to Start Directory Watcher Thread
- *
- * @param path
- */
- sealed case class WATCH_DIR(path: String)
-
- /**
* After creating an actorOf FeyCore send this message to configure.
*/
case object START
/**
- * Used by the DirectoryWatcher to notify fey when a new file was added
- *
- * @param file java.io.File
+ * Json Receiver actor will send this message everytime a json is received
+ * Does not matter from where it was received
+ * @param json
+ * @param file
*/
- case class NEW_FILE_ACTION(file: File)
+ case class ORCHESTRATION_RECEIVED(json: JsValue, file: File)
case class STOP_EMPTY_ORCHESTRATION(orchID: String)
@@ -396,7 +314,7 @@ protected object FeyCore{
Props(new FeyCore)
}
- final val WATCHER_NAME: String = "DIR_WATCHER"
+ final val JSON_RECEIVER_NAME: String = "JSON_RECEIVER"
final val IDENTIFIER_NAME: String = "FEY_IDENTIFIER"
/**
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
new file mode 100644
index 0000000..cc14a81
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
@@ -0,0 +1,190 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.io.FileOutputStream
+import java.net.URL
+import java.io.File
+
+import com.eclipsesource.schema._
+import org.slf4j.LoggerFactory
+import play.api.libs.json._
+import JSON_PATH._
+import java.nio.file.{Files, Paths}
+
+import org.apache.commons.io.IOUtils
+import org.apache.commons.codec.binary.Base64
+
+/**
+ * Basic class to be used when implementing a new JSON receiver
+ */
+trait JsonReceiver extends Runnable{
+
+ val log = LoggerFactory.getLogger(this.getClass)
+
+ /**
+ * Default Run if no one is specified on concret class
+ */
+ override def run(): Unit = {
+ try {
+ while (!Thread.currentThread().isInterrupted) {
+ execute()
+ }
+ }catch{
+ case e: Exception => exceptionOnRun(e)
+ }
+ }
+
+ /**
+ * Checks if JSON complies with defined Schema
+ *
+ * @param json
+ * @return true if it complies or false if it does not
+ */
+ final def validJson(json: JsValue): Boolean = {
+ try {
+ val result = SchemaValidator.validate(CONFIG.JSON_SPEC, json)
+ if (result.isError) {
+ log.error("Incorrect JSON schema \n" + result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
+ val path = (error \ "instancePath").as[String]
+ val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
+ s"$path \n\tErrors: $msg"
+ }).mkString("\n"))
+ false
+ } else {
+ true
+ }
+ }catch{
+ case e: Exception =>
+ log.error("Error while validating JSON", e)
+ false
+ }
+ }
+
+ /**
+ * Checks if any of the performers need to have its jar downloaded
+ * All the Receivers must call this method so the Jars can be downloaded at runtime
+ *
+ * @param json Orchestration JSON object
+ */
+ final def checkForLocation(json: JsValue): Unit = {
+ (json \ ENSEMBLES).as[List[JsObject]].foreach(ensemble => {
+ (ensemble \ PERFORMERS).as[List[JsObject]].foreach(performer => {
+ if((performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION)){
+ val jarName = (performer \ SOURCE \ SOURCE_NAME).as[String]
+ val jarLocation = (performer \ SOURCE \ JAR_LOCATION).as[String].toLowerCase
+ if( (jarLocation.startsWith("https://") || jarLocation.startsWith("http://")) && !jarDownloaded(jarName)){
+ val jarLocation = (performer \ SOURCE \ JAR_LOCATION).as[String]
+ downloadJAR(jarLocation, jarName)
+ }
+ }
+ })
+ })
+ }
+
+ /**
+ * Checks if the jar already exists
+ *
+ * @param jarName
+ * @return
+ */
+ private final def jarDownloaded(jarName: String): Boolean = {
+ try {
+ Files.exists(Paths.get(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName"))
+ }catch{
+ case e: Exception =>
+ log.error(s"Could not check if $jarName exists", e)
+ true
+ }
+ }
+
+ private final def downloadJAR(url: String, jarName: String): Unit = {
+ var outputStream: FileOutputStream = null
+ try{
+ val extractedURL = extractCredentials(url)
+ log.info(s"Downloading $jarName from ${extractedURL._1}")
+
+ val connection = new URL(extractedURL._1).openConnection
+
+ // Add authentication Header if credentials is defined
+ extractedURL._2 match {
+ case Some(credentials) =>
+ connection.setRequestProperty(HttpBasicAuth.AUTHORIZATION, HttpBasicAuth.getHeader(credentials._1, credentials._2))
+ case None =>
+ }
+ outputStream = new FileOutputStream(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")
+ IOUtils.copy(connection.getInputStream,outputStream)
+ outputStream.close()
+ }catch{
+ case e: Exception =>
+ if(outputStream != null) {
+ outputStream.close()
+ (new File(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")).delete()
+ }
+ log.error(s"Could not download $jarName from $url", e)
+ }
+ }
+
+ /**
+ *
+ * @param url
+ * @return (NO_CRED_URL, (USER, PASSWORD))
+ */
+ private final def extractCredentials(url: String): (String, Option[(String, String)]) = {
+ if(url.contains("@")) {
+ val atIndex = url.indexOf("@")
+ if (url.startsWith("https")) {
+ val cred = url.substring(8, atIndex)
+ val userPass = cred.split(":")
+ (url.replace(s"$cred@",""), Option(userPass(0),userPass(1)))
+ } else {
+ val cred = url.substring(7, atIndex)
+ val userPass = cred.split(":")
+ (url.replace(s"$cred@",""), Option(userPass(0),userPass(1)))
+ }
+ }else{
+ (url, None)
+ }
+ }
+
+ /**
+ * Called inside run method
+ */
+ def execute(): Unit = {}
+
+ /**
+ * Called when occurs an exception inside Run.
+ * For example: Thread.interrupt
+ *
+ * @param e
+ */
+ def exceptionOnRun(e: Exception): Unit = {}
+}
+
+object HttpBasicAuth {
+ val BASIC = "Basic";
+ val AUTHORIZATION = "Authorization";
+
+ def encodeCredentials(username: String, password: String): String = {
+ new String(Base64.encodeBase64String((username + ":" + password).getBytes))
+ }
+
+ def getHeader(username: String, password: String): String =
+ BASIC + " " + encodeCredentials(username, password)
+}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
new file mode 100644
index 0000000..2a05710
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
@@ -0,0 +1,87 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.nio.file.Paths
+import java.io.File
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+import play.api.libs.json.{JsValue, Json}
+
+class JsonReceiverActor extends Actor with ActorLogging {
+
+ import JsonReceiverActor._
+
+ val watchFileTask = new WatchServiceReceiver(self)
+ var watchThread = new Thread(watchFileTask, "WatchService")
+
+ override def preStart() {
+ prepareDynamicJarRepo()
+ processCheckpointFiles()
+
+ SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
+ watchThread.setDaemon(true)
+ watchThread.start()
+
+ watchFileTask.watch(Paths.get(CONFIG.JSON_REPOSITORY))
+ }
+
+ private def prepareDynamicJarRepo() = {
+ val jarDir = new File(CONFIG.DYNAMIC_JAR_REPO)
+ if (!jarDir.exists()){
+ jarDir.mkdir()
+ }else if(CONFIG.DYNAMIC_JAR_FORCE_PULL){
+ jarDir.listFiles().foreach(_.delete())
+ }
+ }
+
+
+ private def processCheckpointFiles() = {
+ if (CONFIG.CHEKPOINT_ENABLED) {
+ val checkpoint = new CheckpointProcessor(self)
+ checkpoint.run()
+ }
+ }
+
+ override def postStop() {
+ SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
+ watchThread.interrupt()
+ watchThread.join()
+ }
+
+ override def postRestart(reason: Throwable): Unit = {
+ SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
+ preStart()
+ }
+
+ override def receive: Receive = {
+ case JSON_RECEIVED(json, file) =>
+ log.info(s"JSON RECEIVED => ${Json.stringify(json)}")
+ context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, file)
+
+ case _ =>
+ }
+
+}
+
+object JsonReceiverActor {
+
+ case class JSON_RECEIVED(json: JsValue, file: File)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 58b98d6..ebf7503 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -67,16 +67,17 @@ protected object Utils {
* @param className class path inside the jar
* @return class of FeyGenericActor
*/
- def loadActorClassFromJar(path: String, className: String):Class[FeyGenericActor] = {
+ def loadActorClassFromJar(path: String, className: String, jarName: String):Class[FeyGenericActor] = {
- loadedJars.get(path) match {
+ loadedJars.get(jarName) match {
case None =>
val urls:Array[URL] = Array(new URL(s"jar:file:$path!/"))
val cl: URLClassLoader = URLClassLoader.newInstance(urls)
val clazz = cl.loadClass(className)
val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]]
- loadedJars.put(path, (cl, Map(className -> feyClazz)))
+ log.info(s"$path -> $className")
+ loadedJars.put(jarName, (cl, Map(className -> feyClazz)))
feyClazz
case Some(loadedJar) =>
@@ -84,7 +85,7 @@ protected object Utils {
case None =>
val clazz = loadedJar._1.loadClass(className)
val feyClazz = clazz.asInstanceOf[Class[FeyGenericActor]]
- loadedJars.put(path, (loadedJar._1, Map(className -> feyClazz) ++ loadedJar._2))
+ loadedJars.put(jarName, (loadedJar._1, Map(className -> feyClazz) ++ loadedJar._2))
feyClazz
case Some(clazz) =>
clazz
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
new file mode 100644
index 0000000..e42bcbb
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
@@ -0,0 +1,103 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.nio.file.StandardWatchEventKinds._
+import java.nio.file.{FileSystems, Path}
+import java.io.File
+import akka.actor.ActorRef
+import org.apache.iota.fey.JsonReceiverActor.JSON_RECEIVED
+import play.api.libs.json._
+
+import scala.io.Source
+
+class WatchServiceReceiver(receiverActor: ActorRef) extends JsonReceiver{
+
+ private val watchService = FileSystems.getDefault.newWatchService()
+
+ def watch(path: Path) : Unit = path.register(watchService, ENTRY_CREATE, ENTRY_MODIFY)
+
+ def getJsonObject(params: String): Option[JsValue] = {
+ try{
+ val stringJson = Source.fromFile(params).getLines.mkString
+ Option(Json.parse(stringJson))
+ }catch{
+ case e: Exception =>
+ log.error("Could not parse JSON", e)
+ None
+ }
+ }
+
+ override def execute(): Unit = {
+ processInitialFiles()
+
+ val key = watchService.take()
+ val eventsIterator = key.pollEvents().iterator()
+
+ while(eventsIterator.hasNext) {
+ val event = eventsIterator.next()
+ val relativePath = event.context().asInstanceOf[Path]
+ val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
+
+ event.kind() match {
+ case (ENTRY_CREATE | ENTRY_MODIFY) if path.toString.endsWith(CONFIG.JSON_EXTENSION) =>
+ processJson(path.toString, path.toFile)
+ case _ =>
+ }
+ }
+
+ key.reset()
+ }
+
+ private def processJson(path: String, file: File) = {
+ try{
+ getJsonObject(path) match {
+ case Some(orchestrationJSON) =>
+ val valid = validJson(orchestrationJSON)
+ if(valid && (orchestrationJSON \ JSON_PATH.COMMAND).as[String].toUpperCase != "DELETE"){
+ checkForLocation(orchestrationJSON)
+ }
+ if(valid) {
+ receiverActor ! JSON_RECEIVED(orchestrationJSON, file)
+ }else{
+ log.warn(s"File $path not processed. Incorrect JSON schema")
+ }
+ case None =>
+ }
+ } catch {
+ case e: Exception =>
+ log.error(s"File $path will not be processed", e)
+ }
+ }
+
+ private def processInitialFiles() = {
+ Utils.getFilesInDirectory(CONFIG.JSON_REPOSITORY)
+ .filter(file => file.getName.endsWith(CONFIG.JSON_EXTENSION))
+ .foreach(file => {
+ processJson(file.getAbsolutePath, file)
+ })
+ }
+
+ override def exceptionOnRun(e: Exception): Unit = {
+ log.error("Watch Service stopped", e)
+ watchService.close()
+ throw e
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 504fddb..3dc2706 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -21,7 +21,8 @@ import sbt.Keys._
object ModuleDependencies {
import Dependencies._
- val FeyDependencies = compile(akka_actor,typesafe_config,playJson,slf4j,log4jbind,sprayCan,sprayRouting,jsonValidator,javaFilter)
+ val FeyDependencies = compile(akka_actor,typesafe_config,playJson,slf4j,log4jbind,sprayCan,
+ sprayRouting,jsonValidator,javaFilter,codec,apacheIO)
val StreamDependencies = provided(akka_actor, fey)
val ZMQDependencies = provided(akka_actor, fey) ++ compile(zmq)
val VirtualSensorDependencies = provided(akka_actor, fey) ++ compile(math3)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/6fbba635/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 72567a7..e46381f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -26,6 +26,8 @@ object Dependencies {
val fey = "org.apache.iota" %% "fey-core" % "1.0-SNAPSHOT"
val zmq = "org.zeromq" % "jeromq" % "0.3.5"
val math3 = "org.apache.commons" % "commons-math3" % "3.2"
+ val codec = "commons-codec" % "commons-codec" % "1.10"
+ val apacheIO = "commons-io" % "commons-io" % "2.4"
val akka_actor = "com.typesafe.akka" %% "akka-actor" % "2.4.2"
val typesafe_config = "com.typesafe" % "config" % "1.3.0"
@@ -41,4 +43,6 @@ object Dependencies {
//restapi
val sprayCan = "io.spray" %% "spray-can" % "1.3.3"
val sprayRouting = "io.spray" %% "spray-routing" % "1.3.3"
+
+
}