You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/11 18:50:33 UTC
[3/8] incubator-iota git commit: Monitoring Actor
Monitoring Actor
- Implemented Monitoring Actor
- All actors on Fey will send monitoring messages to Monitor Actor
- Update Trie to support Events
- Implemented new endpoint for events
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/3743740c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/3743740c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/3743740c
Branch: refs/heads/master
Commit: 3743740c3cc2684e4e8ed2a53414bb89c078b1ab
Parents: 3d38f1a
Author: Barbara Gomes <ba...@gmail.com>
Authored: Fri Jul 8 13:25:14 2016 -0700
Committer: Barbara Gomes <ba...@gmail.com>
Committed: Mon Jul 11 10:51:30 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/iota/fey/Application.scala | 11 ++-
.../scala/org/apache/iota/fey/DirWatcher.scala | 7 ++
.../scala/org/apache/iota/fey/Ensemble.scala | 8 ++
.../scala/org/apache/iota/fey/FeyCore.scala | 9 +++
.../org/apache/iota/fey/FeyGenericActor.scala | 3 +
.../org/apache/iota/fey/IdentifyFeyActors.scala | 2 +-
.../scala/org/apache/iota/fey/Monitor.scala | 80 ++++++++++++++++++++
.../scala/org/apache/iota/fey/MyService.scala | 13 +++-
.../org/apache/iota/fey/Orchestration.scala | 4 +
.../scala/org/apache/iota/fey/TrieNode.scala | 42 +++++++---
.../main/scala/org/apache/iota/fey/Utils.scala | 6 ++
11 files changed, 173 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index e9d5073..c11815a 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -30,13 +30,22 @@ object Application extends App {
CONFIG.loadUserConfiguration(if(args.length == 1) args(0) else "")
+ SYSTEM_ACTORS
+
+}
+
+object SYSTEM_ACTORS{
+
implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM")
val fey = system.actorOf(FeyCore.props, name = "FEY-CORE")
val service = system.actorOf(Props[MyServiceActor], name = "FEY_REST_API")
+ val monitoring = system.actorOf(Props[Monitor], "FEY-MONITOR")
+
implicit val timeout = Timeout(800.seconds)
- IO(Http) ? Http.Bind(service, interface = "0.0.0.0", port = 16666)
+ IO(Http) ? Http.Bind(SYSTEM_ACTORS.service, interface = "0.0.0.0", port = 16666)
+
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
index 6a8c3c1..be779b0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/DirWatcher.scala
@@ -69,14 +69,21 @@ class DirectoryWatcherActor(val fileExtension: String) extends Actor with ActorL
var watchThread = new Thread(watchFileTask, "WatchService")
override def preStart() {
+ SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
watchThread.setDaemon(true)
watchThread.start()
}
override def postStop() {
+ SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
watchThread.interrupt()
}
+ override def postRestart(reason: Throwable): Unit = {
+ SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
+ preStart()
+ }
+
override def receive: Receive = {
case MONITOR(path) =>
log.info(s"Start monitoring ${path.getFileName}")
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 1a18bb9..f5726b5 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -50,6 +50,7 @@ protected class Ensemble(val orchestrationID: String,
context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
case Terminated(actor) =>
+ SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
log.error(s"DEAD nPerformers ${actor.path.name}")
context.children.foreach{ child =>
context.unwatch(child)
@@ -79,6 +80,8 @@ protected class Ensemble(val orchestrationID: String,
*/
override def preStart() = {
+ SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
+
val connectors_js = (ensembleSpec \ CONNECTIONS).as[List[JsObject]]
val performers_js = (ensembleSpec \ PERFORMERS).as[List[JsObject]]
@@ -96,7 +99,12 @@ protected class Ensemble(val orchestrationID: String,
}
+ override def postStop() = {
+ SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
+ }
+
override def postRestart(reason: Throwable): Unit = {
+ SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
preStart()
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index fe9149a..5985f26 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -74,6 +74,7 @@ protected class FeyCore extends Actor with ActorLogging{
deleteOrchestration(orchID)
case Terminated(actor) =>
+ SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
actor.path.name match {
case IDENTIFIER_NAME =>
createIdentifierActor()
@@ -99,12 +100,14 @@ protected class FeyCore extends Actor with ActorLogging{
* Clean up Fey Cache
*/
override def postStop() = {
+ SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
FEY_CACHE.activeOrchestrations.clear()
FEY_CACHE.orchestrationsAwaitingTermination.clear()
ORCHESTRATION_CACHE.orchestration_metadata.clear()
}
override def preStart() = {
+ SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
log.info("Starting Fey Core")
if (CHEKPOINT_ENABLED) {
processInitialFiles(CHECKPOINT_DIR, true)
@@ -112,6 +115,11 @@ protected class FeyCore extends Actor with ActorLogging{
self ! START
}
+ override def postRestart(reason: Throwable): Unit = {
+ SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
+ preStart()
+ }
+
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
case _: Exception =>
@@ -400,6 +408,7 @@ protected object FeyCore{
.getLines()
.mkString(""))).get
}
+
}
private object FEY_CACHE{
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 98d8c7c..0e26252 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -82,17 +82,20 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
}
override final def preStart() = {
+ SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
onStart()
startScheduler()
}
override final def postStop() = {
+ SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
log.info(s"STOPPED actor ${self.path.name}")
stopScheduler()
onStop()
}
override final def postRestart(reason: Throwable) = {
+ SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
log.info(s"RESTARTED Actor ${self.path.name}")
preStart()
onRestart(reason)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala b/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
index 80387db..ac6e5b8 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/IdentifyFeyActors.scala
@@ -82,7 +82,7 @@ protected object IdentifyFeyActors{
*/
def generateTreeJson(): String = {
val trie = new Trie()
- actorsPath.map(_.replace("user/","")).foreach(trie.appendPath(_))
+ actorsPath.map(_.replace("user/","")).foreach(trie.append(_))
Json.stringify(trie.print)
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
new file mode 100644
index 0000000..619d97c
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{Actor, ActorLogging}
+
+/**
+ * Created by barbaragomes on 7/8/16.
+ */
+protected class Monitor extends Actor with ActorLogging{
+
+ import Monitor._
+
+ override def receive: Receive = {
+
+ case START(timestamp, info) =>
+ //TODO: Log
+ events.append(sender().path.toString,MonitorEvent(EVENTS.START, timestamp, info))
+
+ case STOP(timestamp, info) =>
+ //TODO: Log
+ events.append(sender().path.toString,MonitorEvent(EVENTS.STOP, timestamp, info))
+
+ case RESTART(reason, timestamp) =>
+ //TODO: Log
+ events.append(sender().path.toString,MonitorEvent(EVENTS.RESTART, timestamp, reason.getMessage))
+
+ case TERMINATE(actorPath, timestamp, info) =>
+ //TODO: Log
+ events.append(actorPath,MonitorEvent(EVENTS.TERMINATE, timestamp, info))
+
+ }
+
+}
+
+protected object Monitor{
+ // Monitoring Messages
+ case class START(timestamp: Long, info: String = "")
+ case class STOP(timestamp: Long, info: String = "")
+ case class RESTART(reason: Throwable, timestamp: Long)
+ case class TERMINATE(actorPath: String, timestamp: Long, info: String = "")
+
+ // Stores Monitoring event
+ case class MonitorEvent(event: String, timestamp: Long, info: String)
+
+ /**
+ * Contains the lifecycle events for actors in Fey
+ */
+ val events: Trie = new Trie()
+
+}
+
+/**
+ * Events Name
+ */
+object EVENTS {
+
+ val START = "START"
+ val STOP = "STOP"
+ val TERMINATE = "TERMINATE"
+ val RESTART = "RESTART"
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
index 3987340..bc1cfa0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
@@ -19,6 +19,7 @@ package org.apache.iota.fey
import akka.actor.Actor
import org.apache.iota.fey.FeyCore.JSON_TREE
+import play.api.libs.json.Json
import spray.http.MediaTypes._
import spray.routing._
@@ -38,6 +39,7 @@ sealed trait MyService extends HttpService {
val home = pathPrefix("fey")
val activeActors = path("activeactors")
+ val actorLifecycle = path("actorslifecycle")
val test = path("test")
val myRoute =
@@ -46,13 +48,22 @@ sealed trait MyService extends HttpService {
get{
respondWithMediaType(`text/html`) {
complete {
- Application.fey ! JSON_TREE
+ SYSTEM_ACTORS.fey ! JSON_TREE
Thread.sleep(2000)
val json = IdentifyFeyActors.generateTreeJson()
IdentifyFeyActors.getHTMLTree(json)
}
}
}
+ } ~
+ actorLifecycle {
+ get{
+ respondWithMediaType(`application/json`) {
+ complete {
+ Json.stringify(Monitor.events.printWithEvents)
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
index 15a9cdd..5ec7588 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
@@ -50,6 +50,7 @@ protected class Orchestration(val name: String,
context.actorSelection(s"*") ! Ensemble.PRINT_ENSEMBLE
case Terminated(actor) =>
+ SYSTEM_ACTORS.monitoring ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
context.unwatch(actor)
log.warning(s"ACTOR DEAD ${actor.path}")
ensembles.remove(actor.path.name)
@@ -68,16 +69,19 @@ protected class Orchestration(val name: String,
}
override def preStart(): Unit = {
+ SYSTEM_ACTORS.monitoring ! Monitor.START(Utils.getTimestamp)
if (ORCHESTRATION_CACHE.orchestration_metadata.contains(guid)){
replayOrchestrationState()
}
}
override def postStop() = {
+ SYSTEM_ACTORS.monitoring ! Monitor.STOP(Utils.getTimestamp)
log.info(s"STOPPED ${self.path.name}")
}
override def postRestart(reason: Throwable): Unit = {
+ SYSTEM_ACTORS.monitoring ! Monitor.RESTART(reason, Utils.getTimestamp)
log.info(s"RESTARTED ${self.path}")
preStart()
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
index 4fcb446..95deacc 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
@@ -20,31 +20,34 @@ package org.apache.iota.fey
import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
/**
* Trie data structure used to create actors hierarchy in Fey
*/
-case class TrieNode(path: String, var children: Array[TrieNode])
+case class TrieNode(path: String, children: ArrayBuffer[TrieNode], events:ArrayBuffer[Monitor.MonitorEvent])
class Trie{
- private val root: TrieNode = TrieNode("FEY-MANAGEMENT-SYSTEM", Array.empty)
+ private val root: TrieNode = TrieNode("FEY-MANAGEMENT-SYSTEM", ArrayBuffer.empty, ArrayBuffer.empty)
var elements: Int = 0
- def appendPath(path: String): Unit = {
- appendPath(path.replaceFirst("akka://","").split("/"),root,1)
+ def append(path: String, event: Monitor.MonitorEvent = null): Unit = {
+ append(path.replaceFirst("akka://","").split("/"),root,1,event)
}
- @tailrec private def appendPath(path: Array[String], root: TrieNode, index: Int): Unit = {
+ @tailrec private def append(path: Array[String], root: TrieNode, index: Int, event: Monitor.MonitorEvent): Unit = {
if(root != null && index < path.length){
var nextRoot = root.children.filter(child => child.path == path(index))
if(nextRoot.isEmpty){
- nextRoot = Array(TrieNode(path(index), Array.empty))
- val children = root.children ++: nextRoot
- root.children = children
+ nextRoot = ArrayBuffer(TrieNode(path(index), ArrayBuffer.empty, ArrayBuffer.empty))
+ root.children += nextRoot(0)
elements += 1
}
- appendPath(path, nextRoot(0),index+1)
+ if(event != null && index == path.length - 1){
+ nextRoot(0).events += event
+ }
+ append(path, nextRoot(0),index+1, event)
}
}
@@ -52,6 +55,10 @@ class Trie{
getObject(root, null)
}
+ def printWithEvents:JsValue = {
+ getObjectEvent(root, null)
+ }
+
private def getObject(root: TrieNode, parent: TrieNode):JsObject = {
if(root != null) {
Json.obj("name" -> root.path,
@@ -62,4 +69,21 @@ class Trie{
Json.obj()
}
}
+
+ private def getObjectEvent(root: TrieNode, parent: TrieNode):JsObject = {
+ if(root != null) {
+ Json.obj("name" -> root.path,
+ "parent" -> (if(parent != null) parent.path else "null"),
+ "events" -> root.events.map(event => {
+ Json.obj("type" -> event.event,
+ "timestamp" -> event.timestamp,
+ "info" -> event.info
+ )
+ }),
+ "children" -> root.children.map(getObjectEvent(_, root))
+ )
+ }else{
+ Json.obj()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3743740c/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 9cd1668..026bf89 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -158,6 +158,12 @@ protected object Utils {
}
}
}
+
+ /**
+ * timestamp in milliseconds
+ * @return Long
+ */
+ def getTimestamp:Long = System.currentTimeMillis()
}
object JSON_PATH{