You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/11 18:50:33 UTC

[3/8] incubator-iota git commit: Monitoring Actor

Monitoring Actor

- Implemented Monitoring Actor
- All actors on Fey will send monitoring messages to Monitor Actor
- Update Trie to support Events
- Implemented new endpoint for events


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/3743740c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/3743740c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/3743740c

Branch: refs/heads/master
Commit: 3743740c3cc2684e4e8ed2a53414bb89c078b1ab
Parents: 3d38f1a
Author: Barbara Gomes <ba...@gmail.com>
Authored: Fri Jul 8 13:25:14 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Mon Jul 11 10:51:30 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/iota/fey/Application.scala | 11 ++-
 .../scala/org/apache/iota/fey/DirWatcher.scala  |  7 ++
 .../scala/org/apache/iota/fey/Ensemble.scala    |  8 ++
 .../scala/org/apache/iota/fey/FeyCore.scala     |  9 +++
 .../org/apache/iota/fey/FeyGenericActor.scala   |  3 +
 .../org/apache/iota/fey/IdentifyFeyActors.scala |  2 +-
 .../scala/org/apache/iota/fey/Monitor.scala     | 80 ++++++++++++++++++++
 .../scala/org/apache/iota/fey/MyService.scala   | 13 +++-
 .../org/apache/iota/fey/Orchestration.scala     |  4 +
 .../scala/org/apache/iota/fey/TrieNode.scala    | 42 +++++++---
 .../main/scala/org/apache/iota/fey/Utils.scala  |  6 ++
 11 files changed, 173 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index e9d5073..c11815a 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -30,13 +30,22 @@ object Application extends App {
 
   CONFIG.loadUserConfiguration(if(args.length == 1) args(0) else "")
 
+  SYSTEM_ACTORS
+
+}
+
+object SYSTEM_ACTORS{
+
   implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM")
 
   val fey = system.actorOf(FeyCore.props, name = "FEY-CORE")
 
   val service = system.actorOf(Props[MyServiceActor], name = "FEY_REST_API")
 
+  val monitoring = system.actorOf(Props[Monitor], "FEY-MONITOR")
+
   implicit val timeout = Timeout(800.seconds)
-  IO(Http) ? Http.Bind(service, interface = "0.0.0.0", port = 16666)
+  IO(Http) ? Http.Bind(SYSTEM_ACTORS.service, interface = "0.0.0.0", port = 16666)
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
index 6a8c3c1..be779b0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
@@ -69,14 +69,21 @@ class DirectoryWatcherActor(val fileExtension: String) extends Actor with ActorL
   var watchThread = new Thread(watchFileTask, "WatchService")
 
   override def preStart() {
+    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
     watchThread.setDaemon(true)
     watchThread.start()
   }
 
   override def postStop() {
+    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
     watchThread.interrupt()
   }
 
+  override def postRestart(reason: Throwable): Unit = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
+    preStart()
+  }
+
   override def receive: Receive = {
     case MONITOR(path) =>
       log.info(s"Start monitoring ${path.getFileName}")

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 1a18bb9..f5726b5 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -50,6 +50,7 @@ protected class Ensemble(val orchestrationID: String,
       context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
 
     case Terminated(actor) =>
+      SYSTEM_ACTORS.monitoring  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
       log.error(s"DEAD nPerformers ${actor.path.name}")
       context.children.foreach{ child =>
         context.unwatch(child)
@@ -79,6 +80,8 @@ protected class Ensemble(val orchestrationID: String,
     */
   override def preStart() = {
 
+    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
+
     val connectors_js = (ensembleSpec \ CONNECTIONS).as[List[JsObject]]
     val performers_js = (ensembleSpec \ PERFORMERS).as[List[JsObject]]
 
@@ -96,7 +99,12 @@ protected class Ensemble(val orchestrationID: String,
 
   }
 
+  override def postStop() = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
+  }
+
   override def postRestart(reason: Throwable): Unit = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
     preStart()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index fe9149a..5985f26 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -74,6 +74,7 @@ protected class FeyCore extends Actor with ActorLogging{
       deleteOrchestration(orchID)
 
     case Terminated(actor) =>
+      SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
       actor.path.name match {
         case IDENTIFIER_NAME =>
           createIdentifierActor()
@@ -99,12 +100,14 @@ protected class FeyCore extends Actor with ActorLogging{
     * Clean up Fey Cache
     */
   override def postStop() = {
+    SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
     FEY_CACHE.activeOrchestrations.clear()
     FEY_CACHE.orchestrationsAwaitingTermination.clear()
     ORCHESTRATION_CACHE.orchestration_metadata.clear()
   }
 
   override def preStart() = {
+    SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
     log.info("Starting Fey Core")
     if (CHEKPOINT_ENABLED) {
       processInitialFiles(CHECKPOINT_DIR, true)
@@ -112,6 +115,11 @@ protected class FeyCore extends Actor with ActorLogging{
     self ! START
   }
 
+  override def postRestart(reason: Throwable): Unit = {
+    SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
+    preStart()
+  }
+
   override val supervisorStrategy =
     OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
       case _: Exception =>
@@ -400,6 +408,7 @@ protected object FeyCore{
       .getLines()
       .mkString(""))).get
   }
+
 }
 
 private object FEY_CACHE{

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 98d8c7c..0e26252 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -82,17 +82,20 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
   }
 
   override final def preStart() = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
     onStart()
     startScheduler()
   }
 
   override final def postStop() = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
     log.info(s"STOPPED actor ${self.path.name}")
     stopScheduler()
     onStop()
   }
 
   override final def postRestart(reason: Throwable) = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
     log.info(s"RESTARTED Actor ${self.path.name}")
     preStart()
     onRestart(reason)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala b/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
index 80387db..ac6e5b8 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
@@ -82,7 +82,7 @@ protected object IdentifyFeyActors{
     */
   def generateTreeJson(): String = {
     val trie = new Trie()
-    actorsPath.map(_.replace("user/","")).foreach(trie.appendPath(_))
+    actorsPath.map(_.replace("user/","")).foreach(trie.append(_))
 
     Json.stringify(trie.print)
   }

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
new file mode 100644
index 0000000..619d97c
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{Actor, ActorLogging}
+
+/**
+  * Created by barbaragomes on 7/8/16.
+  */
+protected class Monitor extends Actor with ActorLogging{
+
+  import Monitor._
+
+  override def receive: Receive = {
+
+    case START(timestamp, info) =>
+      //TODO: Log
+      events.append(sender().path.toString,MonitorEvent(EVENTS.START, timestamp, info))
+
+    case STOP(timestamp, info) =>
+    //TODO: Log
+      events.append(sender().path.toString,MonitorEvent(EVENTS.STOP, timestamp, info))
+
+    case RESTART(reason, timestamp) =>
+    //TODO: Log
+      events.append(sender().path.toString,MonitorEvent(EVENTS.RESTART, timestamp, reason.getMessage))
+
+    case TERMINATE(actorPath, timestamp, info) =>
+    //TODO: Log
+      events.append(actorPath,MonitorEvent(EVENTS.TERMINATE, timestamp, info))
+
+  }
+
+}
+
+protected object Monitor{
+  // Monitoring Messages
+  case class START(timestamp: Long, info: String = "")
+  case class STOP(timestamp: Long, info: String = "")
+  case class RESTART(reason: Throwable, timestamp: Long)
+  case class TERMINATE(actorPath: String, timestamp: Long, info: String = "")
+
+  // Stores Monitoring event
+  case class MonitorEvent(event: String, timestamp: Long, info: String)
+
+  /**
+    * Contains the lifecycle events for actors in Fey
+    */
+  val events: Trie = new Trie()
+
+}
+
+/**
+  * Events Name
+  */
+object EVENTS {
+
+  val START = "START"
+  val STOP = "STOP"
+  val TERMINATE = "TERMINATE"
+  val RESTART = "RESTART"
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
index 3987340..bc1cfa0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
@@ -19,6 +19,7 @@ package org.apache.iota.fey
 
 import akka.actor.Actor
 import org.apache.iota.fey.FeyCore.JSON_TREE
+import play.api.libs.json.Json
 import spray.http.MediaTypes._
 import spray.routing._
 
@@ -38,6 +39,7 @@ sealed trait MyService extends HttpService {
 
   val home = pathPrefix("fey")
   val activeActors = path("activeactors")
+  val actorLifecycle = path("actorslifecycle")
   val test = path("test")
 
   val myRoute =
@@ -46,13 +48,22 @@ sealed trait MyService extends HttpService {
         get{
           respondWithMediaType(`text/html`) {
             complete {
-              Application.fey ! JSON_TREE
+              SYSTEM_ACTORS.fey ! JSON_TREE
               Thread.sleep(2000)
               val json = IdentifyFeyActors.generateTreeJson()
               IdentifyFeyActors.getHTMLTree(json)
             }
           }
         }
+      } ~
+      actorLifecycle {
+        get{
+          respondWithMediaType(`application/json`) {
+            complete {
+              Json.stringify(Monitor.events.printWithEvents)
+            }
+          }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
index 15a9cdd..5ec7588 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
@@ -50,6 +50,7 @@ protected class Orchestration(val name: String,
       context.actorSelection(s"*") ! Ensemble.PRINT_ENSEMBLE
 
     case Terminated(actor) =>
+      SYSTEM_ACTORS.monitoring  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
       context.unwatch(actor)
       log.warning(s"ACTOR DEAD ${actor.path}")
       ensembles.remove(actor.path.name)
@@ -68,16 +69,19 @@ protected class Orchestration(val name: String,
     }
 
   override def preStart(): Unit = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.START(Utils.getTimestamp)
     if (ORCHESTRATION_CACHE.orchestration_metadata.contains(guid)){
       replayOrchestrationState()
     }
   }
 
   override def postStop() = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.STOP(Utils.getTimestamp)
     log.info(s"STOPPED ${self.path.name}")
   }
 
   override def postRestart(reason: Throwable): Unit = {
+    SYSTEM_ACTORS.monitoring  ! Monitor.RESTART(reason, Utils.getTimestamp)
     log.info(s"RESTARTED ${self.path}")
     preStart()
   }

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
index 4fcb446..95deacc 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
@@ -20,31 +20,34 @@ package org.apache.iota.fey
 import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
 
 import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
 
 /**
   * Trie data structure used to create actors hierarchy in Fey
  */
-case class TrieNode(path: String, var children: Array[TrieNode])
+case class TrieNode(path: String, children: ArrayBuffer[TrieNode], events:ArrayBuffer[Monitor.MonitorEvent])
 
 class Trie{
 
-  private val root: TrieNode = TrieNode("FEY-MANAGEMENT-SYSTEM", Array.empty)
+  private val root: TrieNode = TrieNode("FEY-MANAGEMENT-SYSTEM", ArrayBuffer.empty, ArrayBuffer.empty)
   var elements: Int = 0
 
-  def appendPath(path: String): Unit = {
-    appendPath(path.replaceFirst("akka://","").split("/"),root,1)
+  def append(path: String, event: Monitor.MonitorEvent = null): Unit = {
+    append(path.replaceFirst("akka://","").split("/"),root,1,event)
   }
 
-  @tailrec private def appendPath(path: Array[String], root: TrieNode, index: Int): Unit = {
+  @tailrec private def append(path: Array[String], root: TrieNode, index: Int, event: Monitor.MonitorEvent): Unit = {
     if(root != null && index < path.length){
       var nextRoot = root.children.filter(child => child.path == path(index))
       if(nextRoot.isEmpty){
-        nextRoot = Array(TrieNode(path(index), Array.empty))
-        val children = root.children ++: nextRoot
-        root.children = children
+        nextRoot = ArrayBuffer(TrieNode(path(index), ArrayBuffer.empty, ArrayBuffer.empty))
+        root.children += nextRoot(0)
         elements += 1
       }
-      appendPath(path, nextRoot(0),index+1)
+      if(event != null && index == path.length - 1){
+        nextRoot(0).events += event
+      }
+      append(path, nextRoot(0),index+1, event)
     }
   }
 
@@ -52,6 +55,10 @@ class Trie{
     getObject(root, null)
   }
 
+  def printWithEvents:JsValue = {
+    getObjectEvent(root, null)
+  }
+
   private def getObject(root: TrieNode, parent: TrieNode):JsObject = {
     if(root != null) {
      Json.obj("name" -> root.path,
@@ -62,4 +69,21 @@ class Trie{
       Json.obj()
     }
   }
+
+  private def getObjectEvent(root: TrieNode, parent: TrieNode):JsObject = {
+    if(root != null) {
+      Json.obj("name" -> root.path,
+        "parent" -> (if(parent != null) parent.path else "null"),
+        "events" -> root.events.map(event => {
+          Json.obj("type" -> event.event,
+          "timestamp" -> event.timestamp,
+            "info" -> event.info
+          )
+        }),
+        "children" -> root.children.map(getObjectEvent(_, root))
+      )
+    }else{
+      Json.obj()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 9cd1668..026bf89 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -158,6 +158,12 @@ protected object Utils {
       }
     }
   }
+
+  /**
+    * timestamp in milliseconds
+    * @return Long
+    */
+  def getTimestamp:Long = System.currentTimeMillis()
 }
 
 object JSON_PATH{