You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/09/28 01:19:12 UTC
[02/10] incubator-iota git commit: [IOTA-28] Implementing
GenericReceiver actor
[IOTA-28] Implementing GenericReceiver actor
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/a865f2fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/a865f2fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/a865f2fb
Branch: refs/heads/master
Commit: a865f2fb25d3f8657ddae6620588eff232574b45
Parents: 000ca91
Author: Barbara Gomes <ba...@gmail.com>
Authored: Fri Jul 22 14:36:32 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Fri Jul 22 14:36:32 2016 -0700
----------------------------------------------------------------------
.../iota/fey/FeyGenericActorReceiver.scala | 198 +++++++++++++++++++
1 file changed, 198 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/a865f2fb/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala
new file mode 100644
index 0000000..07c7f25
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala
@@ -0,0 +1,198 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.io.{File, FileOutputStream}
+import java.net.URL
+import java.nio.file.{Files, Paths}
+import com.eclipsesource.schema._
+import akka.actor.ActorRef
+import com.eclipsesource.schema.SchemaValidator
+import org.apache.commons.io.IOUtils
+import play.api.libs.json._
+import scala.concurrent.duration._
+import scala.util.Properties._
+
+abstract class FeyGenericActorReceiver(override val params: Map[String,String] = Map.empty,
+ override val backoff: FiniteDuration = 1.minutes,
+ override val connectTo: Map[String,ActorRef] = Map.empty,
+ override val schedulerTimeInterval: FiniteDuration = 2.seconds,
+ override val orchestrationName: String = "",
+ override val orchestrationID: String = "",
+ override val autoScale: Boolean = false) extends FeyGenericActor{
+
+ private[fey] val feyCore = FEY_CORE_ACTOR.actorRef
+
+ override final def processMessage[T](message: T, sender: ActorRef): Unit = {
+ try {
+ val jsonString = getJSONString(message)
+ processJson(jsonString)
+ startBackoff()
+ }catch{
+ case e: Exception => log.error(e, s"Could not process message $message")
+ }
+ }
+
+ private[fey] def processJson(jsonString: String) = {
+ var orchID:String = "None"
+ try{
+ val orchestrationJSON = Json.parse(jsonString)
+ orchID = (orchestrationJSON \ JSON_PATH.GUID).as[String]
+ val valid = validJson(orchestrationJSON)
+ if(valid && (orchestrationJSON \ JSON_PATH.COMMAND).as[String].toUpperCase != "DELETE"){
+ checkForLocation(orchestrationJSON)
+ }
+ if(valid) {
+ feyCore ! FeyCore.ORCHESTRATION_RECEIVED(orchestrationJSON, None)
+ }else{
+ log.warning(s"Could not forward Orchestration $orchID. Invalid JSON schema")
+ }
+ } catch {
+ case e: Exception =>
+ log.error(e, s"Orchestration $orchID could not be forwarded")
+ }
+ }
+
+ /**
+ * Return a JSON string
+ * @param input the received process message
+ * @tparam T
+ * @return String that can be converted to JSON
+ */
+ def getJSONString[T](input: T): String
+
+ /**
+ * Checks if JSON complies with defined Schema
+ *
+ * @param json
+ * @return true if it complies or false if it does not
+ */
+ final def validJson(json: JsValue): Boolean = {
+ try {
+ val result = SchemaValidator.validate(CONFIG.JSON_SPEC, json)
+ if (result.isError) {
+ log.error("Incorrect JSON schema \n" + result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
+ val path = (error \ "instancePath").as[String]
+ val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
+ s"$path \n\tErrors: $msg"
+ }).mkString("\n"))
+ false
+ } else {
+ true
+ }
+ }catch{
+ case e: Exception =>
+ log.error("Error while validating JSON", e)
+ false
+ }
+ }
+
+ /**
+ * Checks if any of the performers need to have its jar downloaded
+ * All the Receivers must call this method so the Jars can be downloaded at runtime
+ *
+ * @param json Orchestration JSON object
+ */
+ final def checkForLocation(json: JsValue): Unit = {
+ (json \ JSON_PATH.ENSEMBLES).as[List[JsObject]].foreach(ensemble => {
+ (ensemble \ JSON_PATH.PERFORMERS).as[List[JsObject]].foreach(performer => {
+ if((performer \ JSON_PATH.SOURCE).as[JsObject].keys.contains(JSON_PATH.JAR_LOCATION)){
+ val location = (performer \ JSON_PATH.SOURCE \ JSON_PATH.JAR_LOCATION).as[JsObject]
+ val jarName = (performer \ JSON_PATH.SOURCE \ JSON_PATH.SOURCE_NAME).as[String]
+ val url = (location \ JSON_PATH.JAR_LOCATION_URL).as[String].toLowerCase
+ if( (url.startsWith("https://") || url.startsWith("http://")) && !jarDownloaded(jarName)){
+
+ val credentials:Option[JsObject] = {
+ if(location.keys.contains(JSON_PATH.JAR_CREDENTIALS_URL)){
+ Option((location \ JSON_PATH.JAR_CREDENTIALS_URL).as[JsObject])
+ }else{
+ None
+ }
+ }
+
+ downloadJAR(url, jarName, credentials)
+ }
+ }else{
+ log.debug("Location not defined in JSON")
+ }
+ })
+ })
+ }
+
+ /**
+ * Checks if the jar already exists
+ *
+ * @param jarName
+ * @return
+ */
+ private final def jarDownloaded(jarName: String): Boolean = {
+ try {
+ Files.exists(Paths.get(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName"))
+ }catch{
+ case e: Exception =>
+ log.error(s"Could not check if $jarName exists", e)
+ true
+ }
+ }
+
+ private final def downloadJAR(url: String, jarName: String, credentials: Option[JsObject]): Unit = {
+ var outputStream: FileOutputStream = null
+ try{
+ log.info(s"Downloading $jarName from $url")
+
+ val connection = new URL(s"$url/$jarName").openConnection
+
+ resolveCredentials(credentials) match{
+ case Some(userpass) =>
+ connection.setRequestProperty(HttpBasicAuth.AUTHORIZATION, HttpBasicAuth.getHeader(userpass._1, userpass._2))
+ case None =>
+ }
+
+ // Download Jar
+ outputStream = new FileOutputStream(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")
+ IOUtils.copy(connection.getInputStream,outputStream)
+ outputStream.close()
+
+ }catch{
+ case e: Exception =>
+ if(outputStream != null) {
+ outputStream.close()
+ (new File(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")).delete()
+ }
+ log.error(s"Could not download $jarName from $url", e)
+ }
+ }
+
+ /**
+ * Tries to resolve the credentials looking to the environment variable
+ * If it is not possible to find a env var with that name, then use the name itself
+ * @param credentials
+ * @return (user, password)
+ */
+ def resolveCredentials(credentials: Option[JsObject]):Option[(String, String)] = {
+ credentials match {
+ case None => None
+ case Some(cred) =>
+ val user = (cred \ JSON_PATH.JAR_CRED_USER).as[String]
+ val password = (cred \ JSON_PATH.JAR_CRED_PASSWORD).as[String]
+ Option(envOrElse(user,user), envOrElse(password,password))
+ }
+ }
+
+}