You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:55 UTC

[45/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala
deleted file mode 100644
index 342cd87..0000000
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util.concurrent.TimeUnit
-
-import io.gearpump.partitioner._
-
-object Constants {
-  val MASTER_WATCHER = "masterwatcher"
-  val SINGLETON_MANAGER = "singleton"
-
-  val MASTER_CONFIG = "gearpump-master"
-  val WORKER_CONFIG = "gearpump-worker"
-  val UI_CONFIG = "gearpump-ui"
-  val LINUX_CONFIG = "gearpump-linux" // linux or Mac
-
-  val MASTER = "master"
-  val WORKER = "worker"
-
-  val GEARPUMP_WORKER_SLOTS = "gearpump.worker.slots"
-  val GEARPUMP_EXECUTOR_PROCESS_LAUNCHER = "gearpump.worker.executor-process-launcher"
-  val GEARPUMP_SCHEDULING_SCHEDULER = "gearpump.scheduling.scheduler-class"
-  val GEARPUMP_SCHEDULING_REQUEST = "gearpump.scheduling.requests"
-  val GEARPUMP_TRANSPORT_SERIALIZER = "gearpump.transport.serializer"
-  val GEARPUMP_SERIALIZER_POOL = "gearpump.serialization-framework"
-  val GEARPUMP_SERIALIZERS = "gearpump.serializers"
-  val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher"
-  val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters"
-  val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout"
-  val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS =
-    "gearpump.worker.executor-share-same-jvm-as-worker"
-
-  val GEARPUMP_HOME = "gearpump.home"
-  val GEARPUMP_FULL_SCALA_VERSION = "gearpump.binary-version-with-scala-version"
-  val GEARPUMP_HOSTNAME = "gearpump.hostname"
-  val GEARPUMP_APPMASTER_ARGS = "gearpump.appmaster.vmargs"
-  val GEARPUMP_APPMASTER_EXTRA_CLASSPATH = "gearpump.appmaster.extraClasspath"
-  val GEARPUMP_EXECUTOR_ARGS = "gearpump.executor.vmargs"
-  val GEARPUMP_EXECUTOR_EXTRA_CLASSPATH = "gearpump.executor.extraClasspath"
-  val GEARPUMP_LOG_DAEMON_DIR = "gearpump.log.daemon.dir"
-  val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir"
-  val HADOOP_CONF = "hadoopConf"
-
-  // Id used to identity Master JVM process in low level resource manager like YARN.
-  // In YARN, it means the container Id.
-  val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID =
-    "gearpump.master-resource-manager-container-id"
-
-  // Id used to identity Worker JVM process in low level resource manager like YARN.
-  // In YARN, it means the container Id.
-  val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID =
-    "gearpump.worker-resource-manager-container-id"
-
-  // true or false
-  val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm"
-  val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port"
-
-  // Whether to turn on GC log, true or false
-  val GEARPUMP_VERBOSE_GC = "gearpump.verbose-gc"
-
-  // The time out for Future, like ask.
-  // !Important! This global timeout setting will also impact the UI
-  // responsive time if set to too big. Please make sure you have
-  // enough justification to change this global setting, otherwise
-  // please use your local timeout setting instead.
-  val FUTURE_TIMEOUT = akka.util.Timeout(15, TimeUnit.SECONDS)
-
-  val GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS = "gearpump.start-executor-system-timeout-ms"
-
-  val APPMASTER_DEFAULT_EXECUTOR_ID = -1
-
-  val NETTY_BUFFER_SIZE = "gearpump.netty.buffer-size"
-  val NETTY_MAX_RETRIES = "gearpump.netty.max-retries"
-  val NETTY_BASE_SLEEP_MS = "gearpump.netty.base-sleep-ms"
-  val NETTY_MAX_SLEEP_MS = "gearpump.netty.max-sleep-ms"
-  val NETTY_MESSAGE_BATCH_SIZE = "gearpump.netty.message-batch-size"
-  val NETTY_FLUSH_CHECK_INTERVAL = "gearpump.netty.flush-check-interval"
-  val NETTY_TCP_HOSTNAME = "akka.remote.netty.tcp.hostname"
-  val NETTY_DISPATCHER = "gearpump.netty.dispatcher"
-
-  val GEARPUMP_USERNAME = "gearpump.username"
-  val GEARPUMP_APPLICATION_ID = "gearpump.applicationId"
-  val GEARPUMP_MASTER_STARTTIME = "gearpump.master.starttime"
-  val GEARPUMP_EXECUTOR_ID = "gearpump.executorId"
-  // Application jar property
-  val GEARPUMP_APP_JAR = "gearpump.app.jar"
-  val GEARPUMP_APP_NAME_PREFIX = "gearpump.app.name.prefix"
-
-  // Where the jar is stored at. It can be a HDFS, or a local disk.
-  val GEARPUMP_APP_JAR_STORE_ROOT_PATH = "gearpump.jarstore.rootpath"
-
-  // Uses java property -Dgearpump.config.file=xxx.conf to set customized configuration
-  // Otherwise application.conf in classpath will be loaded
-  val GEARPUMP_CUSTOM_CONFIG_FILE = "gearpump.config.file"
-
-  // Metrics related
-  val GEARPUMP_METRIC_ENABLED = "gearpump.metrics.enabled"
-  val GEARPUMP_METRIC_SAMPLE_RATE = "gearpump.metrics.sample-rate"
-  val GEARPUMP_METRIC_REPORT_INTERVAL = "gearpump.metrics.report-interval-ms"
-  val GEARPUMP_METRIC_GRAPHITE_HOST = "gearpump.metrics.graphite.host"
-  val GEARPUMP_METRIC_GRAPHITE_PORT = "gearpump.metrics.graphite.port"
-  val GEARPUMP_METRIC_REPORTER = "gearpump.metrics.reporter"
-
-  // Retains at max @RETAIN_HISTORY_HOURS history data
-  val GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS = "gearpump.metrics.retainHistoryData.hours"
-
-  // Time interval between two history data points.
-  val GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS = "gearpump.metrics.retainHistoryData.intervalMs"
-
-  // Retains at max @RETAIN_LATEST_SECONDS recent data points
-  val GEARPUMP_RETAIN_RECENT_DATA_SECONDS = "gearpump.metrics.retainRecentData.seconds"
-
-  // time interval between two recent data points.
-  val GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS = "gearpump.metrics.retainRecentData.intervalMs"
-
-  // AppMaster will max wait this time until it declare the resource cannot be allocated,
-  // and shutdown itself
-  val GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT = "gearpump.resource-allocation-timeout-seconds"
-
-  // Service related
-  val GEARPUMP_SERVICE_HTTP = "gearpump.services.http"
-  val GEARPUMP_SERVICE_HOST = "gearpump.services.host"
-  val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path"
-  val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise"
-
-  // The partitioners provided by Gearpump
-  val BUILTIN_PARTITIONERS = Array(
-    classOf[BroadcastPartitioner],
-    classOf[CoLocationPartitioner],
-    classOf[HashPartitioner],
-    classOf[ShuffleGroupingPartitioner],
-    classOf[ShufflePartitioner])
-
-  // Security related
-  val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file"
-  val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal"
-
-  val GEARPUMP_METRICS_MAX_LIMIT = "gearpump.metrics.akka.max-limit-on-query"
-  val GEARPUMP_METRICS_AGGREGATORS = "gearpump.metrics.akka.metrics-aggregator-class"
-
-  val GEARPUMP_UI_SECURITY = "gearpump.ui-security"
-  val GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED = "gearpump.ui-security.authentication-enabled"
-  val GEARPUMP_UI_AUTHENTICATOR_CLASS = "gearpump.ui-security.authenticator"
-  // OAuth Authentication Factory for UI server.
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED = "gearpump.ui-security.oauth2-authenticator-enabled"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATORS = "gearpump.ui-security.oauth2-authenticators"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS = "class"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK = "callback"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID = "clientid"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET = "clientsecret"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE = "default-userrole"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE = "code"
-  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN = "accesstoken"
-
-  val PREFER_IPV4 = "java.net.preferIPv4Stack"
-
-  val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num"
-
-  val AKKA_SCHEDULER_TICK_DURATION = "akka.scheduler.tick-duration"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/FileUtils.scala b/core/src/main/scala/io/gearpump/util/FileUtils.scala
deleted file mode 100644
index 1561587..0000000
--- a/core/src/main/scala/io/gearpump/util/FileUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.{File, IOException}
-import java.nio.charset.Charset
-
-import io.gearpump.google.common.io.Files
-
-object FileUtils {
-  private val UTF8 = Charset.forName("UTF-8")
-
-  def write(file: File, str: String): Unit = {
-    Files.write(str, file, UTF8)
-  }
-
-  def read(file: File): String = {
-    Files.asCharSource(file, UTF8).read()
-  }
-
-  def writeByteArrayToFile(file: File, bytes: Array[Byte]): Unit = {
-    Files.write(bytes, file)
-  }
-
-  def readFileToByteArray(file: File): Array[Byte] = {
-    Files.toByteArray(file)
-  }
-
-  /** recursively making all parent directories including itself */
-  def forceMkdir(directory: File): Unit = {
-    if (directory.exists() && directory.isFile) {
-      throw new IOException(s"Failed to create directory ${directory.toString}, it already exist")
-    }
-    Files.createParentDirs(directory)
-    val result = directory.mkdir()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Graph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Graph.scala b/core/src/main/scala/io/gearpump/util/Graph.scala
deleted file mode 100644
index 8c34329..0000000
--- a/core/src/main/scala/io/gearpump/util/Graph.scala
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-import scala.annotation.tailrec
-import scala.collection.mutable
-import scala.language.implicitConversions
-
-/**
- * Generic mutable Graph libraries.
- */
-class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable {
-
-  private val _vertices = mutable.Set.empty[N]
-  private val _edges = mutable.Set.empty[(N, E, N)]
-
-  // This is used to ensure the output of this Graph is always stable
-  // Like method vertices(), or edges()
-  private var _indexs = Map.empty[Any, Int]
-  private var _nextIndex = 0
-  private def nextId: Int = {
-    val result = _nextIndex
-    _nextIndex += 1
-    result
-  }
-
-  private def init(): Unit = {
-    Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_))
-    Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_))
-  }
-
-  init()
-
-  /**
-   * Add a vertex
-   * Current Graph is changed.
-   */
-  def addVertex(vertex: N): Unit = {
-    val result = _vertices.add(vertex)
-    if (result) {
-      _indexs += vertex -> nextId
-    }
-  }
-
-  /**
-   * Add a edge
-   * Current Graph is changed.
-   */
-  def addEdge(edge: (N, E, N)): Unit = {
-    val result = _edges.add(edge)
-    if (result) {
-      _indexs += edge -> nextId
-    }
-  }
-
-  /**
-   * return all vertices.
-   * The result is stable
-   */
-  def vertices: List[N] = {
-    // Sorts the vertex so that we can keep the order for mapVertex
-    _vertices.toList.sortBy(_indexs(_))
-  }
-
-  /**
-   * out degree
-   */
-  def outDegreeOf(node: N): Int = {
-    edges.count(_._1 == node)
-  }
-
-  /**
-   * in degree
-   */
-  def inDegreeOf(node: N): Int = {
-    edges.count(_._3 == node)
-  }
-
-  /**
-   * out going edges.
-   */
-  def outgoingEdgesOf(node: N): List[(N, E, N)] = {
-    edges.filter(_._1 == node)
-  }
-
-  /**
-   * incoming edges.
-   */
-  def incomingEdgesOf(node: N): List[(N, E, N)] = {
-    edges.filter(_._3 == node)
-  }
-
-  /**
-   * Remove vertex
-   * Current Graph is changed.
-   */
-  def removeVertex(node: N): Unit = {
-    _vertices.remove(node)
-    _indexs -= node
-    val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node)
-    toBeRemoved.foreach(removeEdge(_))
-  }
-
-  /**
-   * Remove edge
-   * Current Graph is changed.
-   */
-  private def removeEdge(edge: (N, E, N)): Unit = {
-    _indexs -= edge
-    _edges.remove(edge)
-  }
-
-  /**
-   * add edge
-   * Current Graph is changed.
-   */
-  def addEdge(node1: N, edge: E, node2: N): Unit = {
-    addVertex(node1)
-    addVertex(node2)
-    addEdge((node1, edge, node2))
-  }
-
-  /**
-   * Map a graph to a new graph, with vertex converted to a new type
-   * Current Graph is not changed.
-   */
-  def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = {
-    val vertexes = vertices.map(node => (node, fun(node)))
-
-    val vertexMap: Map[N, NewNode] = vertexes.toMap
-
-    val newEdges = edges.map { edge =>
-      (vertexMap(edge._1), edge._2, vertexMap(edge._3))
-    }
-    new Graph(vertexes.map(_._2), newEdges)
-  }
-
-  /**
-   * Map a graph to a new graph, with edge converted to new type
-   * Current graph is not changed.
-   */
-  def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = {
-    val newEdges = edges.map { edge =>
-      (edge._1, fun(edge._1, edge._2, edge._3), edge._3)
-    }
-    new Graph(vertices, newEdges)
-  }
-
-  /**
-   * edges connected to node
-   */
-  def edgesOf(node: N): List[(N, E, N)] = {
-    (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_))
-  }
-
-  /**
-   * all edges
-   */
-  def edges: List[(N, E, N)] = {
-    _edges.toList.sortBy(_indexs(_))
-  }
-
-  /**
-   * Add another graph
-   * Current graph is changed.
-   */
-  def addGraph(other: Graph[N, E]): Graph[N, E] = {
-    (vertices ++ other.vertices).foreach(addVertex(_))
-    (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
-    this
-  }
-
-  /**
-   * clone the graph
-   */
-  def copy: Graph[N, E] = {
-    new Graph(vertices, edges)
-  }
-
-  /**
-   * check empty
-   */
-  def isEmpty: Boolean = {
-    val vertexCount = vertices.size
-    val edgeCount = edges.length
-    if (vertexCount + edgeCount == 0) {
-      true
-    } else {
-      false
-    }
-  }
-
-  /**
-   * sub-graph which contains current node and all neighbour
-   * nodes and edges.
-   *
-   */
-  def subGraph(node: N): Graph[N, E] = {
-    val newGraph = Graph.empty[N, E]
-    for (edge <- edgesOf(node)) {
-      newGraph.addEdge(edge._1, edge._2, edge._3)
-    }
-    newGraph
-  }
-
-  /**
-   * replace vertex, the current Graph is mutated.
-   */
-  def replaceVertex(node: N, newNode: N): Graph[N, E] = {
-    for (edge <- incomingEdgesOf(node)) {
-      addEdge(edge._1, edge._2, newNode)
-    }
-
-    for (edge <- outgoingEdgesOf(node)) {
-      addEdge(newNode, edge._2, edge._3)
-    }
-    removeVertex(node)
-    this
-  }
-
-  private def removeZeroInDegree: List[N] = {
-    val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_))
-    toBeRemoved.foreach(removeVertex(_))
-    toBeRemoved
-  }
-
-  /**
-   * Return an iterator of vertex in topological order
-   * The node returned by Iterator is stable sorted.
-   */
-  def topologicalOrderIterator: Iterator[N] = {
-    val newGraph = copy
-    var output = List.empty[N]
-
-    while (!newGraph.isEmpty) {
-      output ++= newGraph.removeZeroInDegree
-    }
-    output.iterator
-  }
-
-  /**
-   * Return all circles in graph.
-   *
-   * The reference of this algorithm is:
-   * https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
-   */
-  private def findCircles: mutable.MutableList[mutable.MutableList[N]] = {
-    val inStack = mutable.Map.empty[N, Boolean]
-    val stack = mutable.Stack[N]()
-    val indexMap = mutable.Map.empty[N, Int]
-    val lowLink = mutable.Map.empty[N, Int]
-    var index = 0
-
-    val circles = mutable.MutableList.empty[mutable.MutableList[N]]
-
-    def tarjan(node: N): Unit = {
-      indexMap(node) = index
-      lowLink(node) = index
-      index += 1
-      inStack(node) = true
-      stack.push(node)
-
-      outgoingEdgesOf(node).foreach {
-        edge => {
-          if (!indexMap.contains(edge._3)) {
-            tarjan(edge._3)
-            if (lowLink.get(edge._3).get < lowLink.get(node).get) {
-              lowLink(node) = lowLink(edge._3)
-            }
-          } else {
-            if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) {
-              lowLink(node) = indexMap(edge._3)
-            }
-          }
-        }
-      }
-
-      if (indexMap.get(node).get == lowLink.get(node).get) {
-        val circle = mutable.MutableList.empty[N]
-        var n = node
-        do {
-          n = stack.pop()
-          inStack(n) = false
-          circle += n
-        } while (n != node)
-        circles += circle
-      }
-    }
-
-    vertices.foreach {
-      node => {
-        if (!indexMap.contains(node)) tarjan(node)
-      }
-    }
-
-    circles
-  }
-
-  /**
-   * Return an iterator of vertex in topological order of graph with circles
-   * The node returned by Iterator is stable sorted.
-   *
-   * The reference of this algorithm is:
-   * http://www.drdobbs.com/database/topological-sorting/184410262
-   */
-  def topologicalOrderWithCirclesIterator: Iterator[N] = {
-    val circles = findCircles
-    val newGraph = Graph.empty[mutable.MutableList[N], E]
-    circles.foreach {
-      circle => {
-        newGraph.addVertex(circle)
-      }
-    }
-
-    for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield {
-      for (node1 <- circle1; node2 <- circle2) yield {
-        var edges = outgoingEdgesOf(node1)
-        for (edge <- edges; if edge._3 == node2) yield {
-          newGraph.addEdge(circle1, edge._2, circle2)
-        }
-
-        edges = outgoingEdgesOf(node2)
-        for (edge <- edges; if edge._3 == node1) yield {
-          newGraph.addEdge(circle2, edge._2, circle1)
-        }
-      }
-    }
-
-    val topo = newGraph.topologicalOrderIterator
-    topo.flatMap(_.sortBy(_indexs(_)).iterator)
-  }
-
-  /**
-   * check whether there is a loop
-   */
-  def hasCycle(): Boolean = {
-    @tailrec
-    def detectCycle(graph: Graph[N, E]): Boolean = {
-      if (graph.edges.isEmpty) {
-        false
-      } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
-        true
-      } else {
-        graph.removeZeroInDegree
-        detectCycle(graph)
-      }
-    }
-
-    detectCycle(copy)
-  }
-
-  /**
-   * Check whether there are two edges connecting two nodes.
-   */
-  def hasDuplicatedEdge(): Boolean = {
-    edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1)
-  }
-
-  /**
-   * Generate a level map for each vertex withholding:
-   * {{{
-   * if vertex A -> B, then level(A) -> level(B)
-   * }}}
-   */
-  def vertexHierarchyLevelMap(): Map[N, Int] = {
-    val newGraph = copy
-    var output = Map.empty[N, Int]
-    var level = 0
-    while (!newGraph.isEmpty) {
-      output ++= newGraph.removeZeroInDegree.map((_, level)).toMap
-      level += 1
-    }
-    output
-  }
-
-  override def toString: String = {
-    Map("vertices" -> vertices.mkString(","),
-      "edges" -> edges.mkString(",")).toString()
-  }
-}
-
-object Graph {
-
-  /**
-   * Example:
-   *
-   * {{{
-   * Graph(1 ~ 2 ~> 4 ~ 5 ~> 7, 8~9~>55, 11)
-   * Will create a graph with:
-   * nodes:
-   * 1, 4, 7, 8, 55, 11
-   * edge:
-   * 2: (1->4)
-   * 5: (4->7)
-   * 9: (8->55)
-   * }}}
-   */
-  def apply[N, E](elems: Path[_ <: N, _ <: E]*): Graph[N, E] = {
-    val graph = empty[N, E]
-    elems.foreach { path =>
-      path.updategraph(graph)
-    }
-    graph
-  }
-
-  def apply[N, E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = {
-    new Graph(vertices, edges)
-  }
-
-  def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = {
-    Some((graph.vertices, graph.edges))
-  }
-
-  def empty[N, E]: Graph[N, E] = {
-    new Graph(List.empty[N], List.empty[(N, E, N)])
-  }
-
-  class Path[N, + E](path: List[Either[N, E]]) {
-
-    def ~[Edge >: E](edge: Edge): Path[N, Edge] = {
-      new Path(path :+ Right(edge))
-    }
-
-    def ~>[Node >: N](node: Node): Path[Node, E] = {
-      new Path(path :+ Left(node))
-    }
-
-    def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
-      this ~ edge ~> node
-    }
-
-    private[Graph] def updategraph[Node >: N, Edge >: E](graph: Graph[Node, Edge]): Unit = {
-      val nodeEdgePair: Tuple2[Option[N], Option[E]] = (None, None)
-      path.foldLeft(nodeEdgePair) { (pair, either) =>
-        val (lastNode, lastEdge) = pair
-        either match {
-          case Left(node) =>
-            graph.addVertex(node)
-            if (lastNode.isDefined) {
-              graph.addEdge(lastNode.get, lastEdge.getOrElse(null.asInstanceOf[Edge]), node)
-            }
-            (Some(node), None)
-          case Right(edge) =>
-            (lastNode, Some(edge))
-        }
-      }
-    }
-  }
-
-  object Path {
-    implicit def anyToPath[N, E](any: N): Path[N, E] = Node(any)
-  }
-
-  implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) {
-
-    override def ~[Edge](edge: Edge): Path[N, Edge] = {
-      new Path(List(Left(self), Right(edge)))
-    }
-
-    override def ~>[Node >: N](node: Node): Path[Node, E] = {
-      new NodeList(List(self, node))
-    }
-
-    override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
-      this ~ edge ~> node
-    }
-  }
-
-  class NodeList[N, E](nodes: List[N]) extends Path[N, E](nodes.map(Left(_))) {
-    override def ~[Edge](edge: Edge): Path[N, Edge] = {
-      new Path(nodes.map(Left(_)) :+ Right(edge))
-    }
-
-    override def ~>[Node >: N](node: Node): Path[Node, E] = {
-      new NodeList(nodes :+ node)
-    }
-
-    override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
-      this ~ edge ~> node
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
deleted file mode 100644
index 7552444..0000000
--- a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util
-import scala.collection.mutable.ListBuffer
-
-import akka.actor.Actor
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption}
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem}
-import io.gearpump.metrics.Metrics._
-import io.gearpump.metrics.MetricsAggregator
-import io.gearpump.util.Constants._
-import io.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator}
-
-/**
- *
- * Metrics service to serve history metrics data
- *
- * For simplicity, HistoryMetricsService will maintain 72 hours coarse-grained data
- * for last 72 hours, and fine-grained data for past 5 min.
- *
- * For the coarse-grained data of past 72 hours, one or two sample point will be stored
- * for each hour.
- *
- * For fine-grained data in last 5 min, there will be 1 sample point per 15 seconds.
- */
-class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends Actor {
-  private val LOG: Logger = LogUtil.getLogger(getClass, name = name)
-  private var metricsStore = Map.empty[String, HistoryMetricsStore]
-  private val systemConfig = context.system.settings.config
-
-  def receive: Receive = metricHandler orElse commandHandler
-  def metricHandler: Receive = {
-    case ReportMetrics =>
-      sender ! DemandMoreMetrics(self)
-    case metrics: MetricType =>
-      val name = metrics.name
-      if (metricsStore.contains(name)) {
-        metricsStore(name).add(metrics)
-      } else {
-        val store = HistoryMetricsStore(name, metrics, config)
-        metricsStore += name -> store
-        store.add(metrics)
-      }
-  }
-
-  private def toRegularExpression(input: String): String = {
-    "^" + input.flatMap {
-      case '*' => ".*"
-      case '?' => "."
-      case char if "()[]$^.{}|\\".contains(char) => "\\" + char
-      case other => s"$other"
-    } + ".*$"
-  }
-
-  private def fetchMetricsHistory(pathPattern: String, readOption: ReadOption.ReadOption)
-  : List[HistoryMetricsItem] = {
-
-    val result = new ListBuffer[HistoryMetricsItem]
-
-    val regex = toRegularExpression(pathPattern).r.pattern
-
-    val iter = metricsStore.iterator
-    while (iter.hasNext) {
-      val (name, store) = iter.next()
-
-      val matcher = regex.matcher(name)
-      if (matcher.matches()) {
-        readOption match {
-          case ReadOption.ReadLatest =>
-            result.append(store.readLatest: _*)
-          case ReadOption.ReadRecent =>
-            result.append(store.readRecent: _*)
-          case ReadOption.ReadHistory =>
-            result.append(store.readHistory: _*)
-          case _ =>
-          // Skip all other options.
-        }
-      }
-    }
-    result.toList
-  }
-
-  val dummyAggregator = new DummyMetricsAggregator
-  private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, MetricsAggregator]
-
-  import scala.collection.JavaConverters._
-  private val validAggregators: Set[String] = {
-    val rootConfig = systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped
-    rootConfig.keySet().asScala.toSet
-  }
-
-  def commandHandler: Receive = {
-    // Path accept syntax ? *, ? will match one char, * will match at least one char
-    case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) =>
-
-      val aggregator = {
-        if (aggregatorClazz == null || aggregatorClazz.isEmpty) {
-          dummyAggregator
-        } else if (aggregators.contains(aggregatorClazz)) {
-          aggregators(aggregatorClazz)
-        } else if (validAggregators.contains(aggregatorClazz)) {
-          val clazz = Class.forName(aggregatorClazz)
-          val constructor = clazz.getConstructor(classOf[Config])
-          val aggregator = constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator]
-          aggregators += aggregatorClazz -> aggregator
-          aggregator
-        } else {
-          LOG.error(s"Aggregator $aggregatorClazz is not in the white list ${validAggregators}, " +
-            s"we will drop all messages. Please see config at ${GEARPUMP_METRICS_AGGREGATORS}")
-          val skipAll = new SkipAllAggregator
-          aggregators += aggregatorClazz -> new SkipAllAggregator
-          skipAll
-        }
-      }
-
-      val metrics = fetchMetricsHistory(inputPath, readOption).iterator
-      sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, metrics))
-  }
-}
-
-object HistoryMetricsService {
-
-  trait MetricsStore {
-    def add(inputMetrics: MetricType): Unit
-
-    def read: List[HistoryMetricsItem]
-
-    /**
-     * read latest inserted records
-     * @return
-     */
-    def readLatest: List[HistoryMetricsItem]
-  }
-
-  trait HistoryMetricsStore {
-    def add(inputMetrics: MetricType): Unit
-
-    /**
-     * read latest inserted records
-     * @return
-     */
-    def readLatest: List[HistoryMetricsItem]
-
-    def readRecent: List[HistoryMetricsItem]
-
-    def readHistory: List[HistoryMetricsItem]
-  }
-
-  class DummyHistoryMetricsStore extends HistoryMetricsStore {
-
-    val empty = List.empty[HistoryMetricsItem]
-
-    override def add(inputMetrics: MetricType): Unit = Unit
-
-    override def readRecent: List[HistoryMetricsItem] = empty
-
-    /**
-     * read latest inserted records
-     * @return
-     */
-    override def readLatest: List[HistoryMetricsItem] = empty
-
-    override def readHistory: List[HistoryMetricsItem] = empty
-  }
-
-  object HistoryMetricsStore {
-    def apply(name: String, metric: MetricType, config: HistoryMetricsConfig)
-      : HistoryMetricsStore = {
-      metric match {
-        case histogram: Histogram => new HistogramMetricsStore(config)
-        case meter: Meter => new MeterMetricsStore(config)
-        case counter: Counter => new CounterMetricsStore(config)
-        case gauge: Gauge => new GaugeMetricsStore(config)
-        case _ => new DummyHistoryMetricsStore // other metrics are not supported
-      }
-    }
-  }
-
-  /**
-   * Metrics store to store history data points
-   * For each time point, we will store single data point.
-   *
-   * @param retainCount how many data points to retain, old data will be removed
-   * @param retainIntervalMs time interval between two data points.
-   */
-  class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) extends MetricsStore {
-
-    private val queue = new util.ArrayDeque[HistoryMetricsItem]()
-    private var latest = List.empty[HistoryMetricsItem]
-
-    // End of the time window we are tracking
-    private var endTime = 0L
-
-    override def add(inputMetrics: MetricType): Unit = {
-      add(inputMetrics, System.currentTimeMillis())
-    }
-
-    def add(inputMetrics: MetricType, now: TimeStamp): Unit = {
-
-      val metrics = HistoryMetricsItem(now, inputMetrics)
-      latest = List(metrics)
-
-      if (now >= endTime) {
-        queue.addFirst(metrics)
-        endTime = (now / retainIntervalMs + 1) * retainIntervalMs
-
-        // Removes old data
-        if (queue.size() > retainCount) {
-          queue.removeLast()
-        }
-      }
-    }
-
-    def read: List[HistoryMetricsItem] = {
-      val result = new ListBuffer[HistoryMetricsItem]
-      import scala.collection.JavaConverters._
-      queue.iterator().asScala.foreach(result.prepend(_))
-      result.toList
-    }
-
-    override def readLatest: List[HistoryMetricsItem] = {
-      latest
-    }
-  }
-
-  /**
-   * Config for how long to keep history metrics data.
-   *
-   * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history data(unit hour)
-   * @param retainHistoryDataIntervalMs time interval between two history data points.(unit: ms)
-   * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS
-   *                                recent data points(unit: seconds)
-   * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS recent
-   *                                   data points(unit: ms)
-   */
-  case class HistoryMetricsConfig(
-      retainHistoryDataHours: Int,
-      retainHistoryDataIntervalMs: Int,
-      retainRecentDataSeconds: Int,
-      retainRecentDataIntervalMs: Int)
-
-  object HistoryMetricsConfig {
-    def apply(config: Config): HistoryMetricsConfig = {
-      val historyHour = config.getInt(GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS)
-      val historyInterval = config.getInt(GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS)
-
-      val recentSeconds = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_SECONDS)
-      val recentInterval = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS)
-      HistoryMetricsConfig(historyHour, historyInterval, recentSeconds, recentInterval)
-    }
-  }
-
-  class HistogramMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
-    private val history = new SingleValueMetricsStore(
-      config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
-      config.retainHistoryDataIntervalMs)
-
-    private val recent = new SingleValueMetricsStore(
-      config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
-      config.retainRecentDataIntervalMs)
-
-    override def add(inputMetrics: MetricType): Unit = {
-      recent.add(inputMetrics)
-      history.add(inputMetrics)
-    }
-
-    override def readRecent: List[HistoryMetricsItem] = {
-      recent.read
-    }
-
-    override def readHistory: List[HistoryMetricsItem] = {
-      history.read
-    }
-
-    override def readLatest: List[HistoryMetricsItem] = {
-      recent.readLatest
-    }
-  }
-
-  class MeterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
-    private val history = new SingleValueMetricsStore(
-      config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
-      config.retainHistoryDataIntervalMs)
-
-    private val recent = new SingleValueMetricsStore(
-      config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
-      config.retainRecentDataIntervalMs)
-
-    override def add(inputMetrics: MetricType): Unit = {
-      recent.add(inputMetrics)
-      history.add(inputMetrics)
-    }
-
-    override def readRecent: List[HistoryMetricsItem] = {
-      recent.read
-    }
-
-    override def readHistory: List[HistoryMetricsItem] = {
-      history.read
-    }
-
-    override def readLatest: List[HistoryMetricsItem] = {
-      recent.readLatest
-    }
-  }
-
-  class CounterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
-    private val history = new SingleValueMetricsStore(
-      config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
-      config.retainHistoryDataIntervalMs)
-
-    private val recent = new SingleValueMetricsStore(
-      config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
-      config.retainRecentDataIntervalMs)
-
-    override def add(inputMetrics: MetricType): Unit = {
-      history.add(inputMetrics)
-      recent.add(inputMetrics)
-    }
-
-    override def readRecent: List[HistoryMetricsItem] = {
-      recent.read
-    }
-
-    override def readHistory: List[HistoryMetricsItem] = {
-      history.read
-    }
-
-    override def readLatest: List[HistoryMetricsItem] = {
-      recent.readLatest
-    }
-  }
-
-  class GaugeMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
-    private val compartor = (left: HistoryMetricsItem, right: HistoryMetricsItem) =>
-      left.value.asInstanceOf[Gauge].value > right.value.asInstanceOf[Gauge].value
-
-    private val history = new SingleValueMetricsStore(
-      config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
-      config.retainHistoryDataIntervalMs)
-
-    private val recent = new SingleValueMetricsStore(
-      config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
-      config.retainRecentDataIntervalMs)
-
-    override def add(inputMetrics: MetricType): Unit = {
-      recent.add(inputMetrics)
-      history.add(inputMetrics)
-    }
-
-    override def readRecent: List[HistoryMetricsItem] = {
-      recent.read
-    }
-
-    override def readHistory: List[HistoryMetricsItem] = {
-      history.read
-    }
-
-    override def readLatest: List[HistoryMetricsItem] = {
-      recent.readLatest
-    }
-  }
-
-  class DummyMetricsAggregator extends MetricsAggregator {
-    def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
-      : List[HistoryMetricsItem] = {
-      inputs.toList
-    }
-  }
-
-  class SkipAllAggregator extends MetricsAggregator {
-    private val empty = List.empty[HistoryMetricsItem]
-    def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
-    : List[HistoryMetricsItem] = {
-      empty
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/LogUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/LogUtil.scala b/core/src/main/scala/io/gearpump/util/LogUtil.scala
deleted file mode 100644
index 1669129..0000000
--- a/core/src/main/scala/io/gearpump/util/LogUtil.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import java.net.InetAddress
-import java.util.Properties
-import scala.util.Try
-
-import com.typesafe.config.Config
-import org.apache.log4j.PropertyConfigurator
-import org.slf4j.{Logger, LoggerFactory}
-
-object LogUtil {
-  object ProcessType extends Enumeration {
-    type ProcessType = Value
-    val MASTER, WORKER, LOCAL, APPLICATION, UI = Value
-  }
-
-  def getLogger[T](
-      clazz: Class[T], context: String = null, master: Any = null, worker: Any = null,
-      executor: Any = null, task: Any = null, app: Any = null, name: String = null): Logger = {
-    var env = ""
-
-    if (null != context) {
-      env += context
-    }
-    if (null != master) {
-      env += "master" + master
-    }
-    if (null != worker) {
-      env += "worker" + worker
-    }
-
-    if (null != app) {
-      env += "app" + app
-    }
-
-    if (null != executor) {
-      env += "exec" + executor
-    }
-    if (null != task) {
-      env += task
-    }
-    if (null != name) {
-      env += name
-    }
-
-    if (!env.isEmpty) {
-      LoggerFactory.getLogger(clazz.getSimpleName + "@" + env)
-    } else {
-      LoggerFactory.getLogger(clazz.getSimpleName)
-    }
-  }
-
-  /** Custom the log file locations by reading config from system properties */
-  def loadConfiguration(config: Config, processType: ProcessType.ProcessType): Unit = {
-    // Set log file name
-    val propName = s"gearpump.${processType.toString.toLowerCase}.log.file"
-    val props = loadConfiguration
-
-    props.setProperty("gearpump.log.file", "${" + propName + "}")
-
-    props.setProperty("JVM_NAME", jvmName)
-
-    processType match {
-      case ProcessType.APPLICATION =>
-        props.setProperty("log4j.rootAppender", "${gearpump.application.logger}")
-        props.setProperty("gearpump.application.log.rootdir",
-          applicationLogDir(config).getAbsolutePath)
-      case _ =>
-        props.setProperty("log4j.rootAppender", "${gearpump.root.logger}")
-        props.setProperty("gearpump.log.dir", daemonLogDir(config).getAbsolutePath)
-    }
-
-    PropertyConfigurator.configure(props)
-  }
-
-  def daemonLogDir(config: Config): File = {
-    val dir = config.getString(Constants.GEARPUMP_LOG_DAEMON_DIR)
-    new File(dir)
-  }
-
-  def verboseLogToConsole(): Unit = {
-    val props = loadConfiguration
-    props.setProperty("log4j.rootLogger", "DEBUG,console")
-    PropertyConfigurator.configure(props)
-  }
-
-  def loadConfiguration: Properties = {
-    val props = new Properties()
-    val log4jConfStream = getClass().getClassLoader.getResourceAsStream("log4j.properties")
-    if (log4jConfStream != null) {
-      props.load(log4jConfStream)
-    }
-    log4jConfStream.close()
-    props
-  }
-
-  private def jvmName: String = {
-    val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local")
-    java.lang.management.ManagementFactory.getRuntimeMXBean().getName()
-  }
-
-  def applicationLogDir(config: Config): File = {
-    val appLogDir = config.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR)
-    new File(appLogDir)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala b/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
deleted file mode 100644
index 0b843f3..0000000
--- a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.{Closeable, Flushable}
-import scala.sys.process.ProcessLogger
-
-import org.slf4j.LoggerFactory
-
-/** Redirect the console output to parent process */
-class ProcessLogRedirector extends ProcessLogger with Closeable with Flushable with ConsoleOutput {
-  private val LOG = LoggerFactory.getLogger("redirect")
-
-  // We only capture the first 1K chars
-  private final val LENGTH = 1000
-  private var _error: String = ""
-  private var _output: String = ""
-
-  def error: String = _error
-  def output: String = _output
-
-  def out(s: => String): Unit = {
-    if (_output.length <= LENGTH) {
-      _output += "\n" + s
-    }
-    LOG.info(s)
-  }
-  def err(s: => String): Unit = {
-    if (_error.length <= LENGTH) {
-      _error += "\n" + s
-    }
-    LOG.error(s)
-  }
-  def buffer[T](f: => T): T = f
-  def close(): Unit = Unit
-  def flush(): Unit = Unit
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala b/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
deleted file mode 100644
index f6c7a2b..0000000
--- a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-/**
- * Check equal using reference-equal.
- */
-trait ReferenceEqual extends AnyRef {
-
-  override def equals(other: Any): Boolean = {
-    this.eq(other.asInstanceOf[AnyRef])
-  }
-
-  override def hashCode(): Int = {
-    super.hashCode()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala b/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
deleted file mode 100644
index 245cb1b..0000000
--- a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.duration.Duration
-
-import akka.actor.ChildRestartStats
-
-/**
- * When one executor or task fails, Gearpump will try to start. However, if it fails after
- * multiple retries, then we abort.
- *
- * @param maxNrOfRetries The number of times is allowed to be restarted, negative value means no
- *                       limit, if the limit is exceeded the policy will not allow to restart
- * @param withinTimeRange Duration of the time window for maxNrOfRetries.
- *                        Duration.Inf means no window
- */
-class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
-  private val status = new ChildRestartStats(null, 0, 0L)
-  private val retriesWindow = (Some(maxNrOfRetries), Some(withinTimeRange.toMillis.toInt))
-
-  def allowRestart: Boolean = {
-    status.requestRestartPermission(retriesWindow)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/RichProcess.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/RichProcess.scala b/core/src/main/scala/io/gearpump/util/RichProcess.scala
deleted file mode 100644
index ab5611f..0000000
--- a/core/src/main/scala/io/gearpump/util/RichProcess.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.sys.process.Process
-
-trait ConsoleOutput {
-  def output: String
-  def error: String
-}
-
-/** Extends Process by providing a additional logger: ConsoleOutput interface. */
-class RichProcess(process: Process, _logger: ConsoleOutput) extends Process {
-  def exitValue(): scala.Int = process.exitValue()
-  def destroy(): scala.Unit = process.destroy()
-  def logger: ConsoleOutput = _logger
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala b/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
deleted file mode 100644
index 64b920c..0000000
--- a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration._
-
-import akka.actor.{Actor, ActorRef}
-import akka.pattern.ask
-
-/** A helper util to send a message to remote actor and notify callback when timeout */
-trait TimeOutScheduler {
-  this: Actor =>
-  import context.dispatcher
-
-  def sendMsgWithTimeOutCallBack(
-      target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => Unit): Unit = {
-    val result = target.ask(msg)(FiniteDuration(milliSeconds, TimeUnit.MILLISECONDS))
-    result onSuccess {
-      case msg =>
-        self ! msg
-    }
-    result onFailure {
-      case _ => timeOutHandler
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Util.scala b/core/src/main/scala/io/gearpump/util/Util.scala
deleted file mode 100644
index 8ed9bb3..0000000
--- a/core/src/main/scala/io/gearpump/util/Util.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
-import java.net.{ServerSocket, URI}
-import scala.concurrent.forkjoin.ThreadLocalRandom
-import scala.sys.process.Process
-import scala.util.{Failure, Success, Try}
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-import io.gearpump.cluster.AppJar
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.transport.HostPort
-
-object Util {
-  val LOG = LogUtil.getLogger(getClass)
-  private val defaultUri = new URI("file:///")
-  private val appNamePattern = "^[a-zA-Z_][a-zA-Z0-9_]+$".r.pattern
-
-  def validApplicationName(appName: String): Boolean = {
-    appNamePattern.matcher(appName).matches()
-  }
-
-  def getCurrentClassPath: Array[String] = {
-    val classpath = System.getProperty("java.class.path")
-    val classpathList = classpath.split(File.pathSeparator)
-    classpathList
-  }
-
-  def version: String = {
-    val home = System.getProperty(Constants.GEARPUMP_HOME)
-    val version = Try {
-      val versionFile = new FileInputStream(new File(home, "VERSION"))
-      val reader = new BufferedReader(new InputStreamReader(versionFile))
-      val version = reader.readLine().replace("version:=", "")
-      versionFile.close()
-      version
-    }
-    version match {
-      case Success(version) =>
-        version
-      case Failure(ex) =>
-        LOG.error("failed to read VERSION file, " + ex.getMessage)
-        "Unknown-Version"
-    }
-  }
-
-  def startProcess(options: Array[String], classPath: Array[String], mainClass: String,
-      arguments: Array[String]): RichProcess = {
-    val java = System.getProperty("java.home") + "/bin/java"
-    val command = List(java) ++ options ++
-      List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments
-    LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " +
-      s"\n ${options.mkString(" ")}")
-    val logger = new ProcessLogRedirector()
-    val process = Process(command).run(logger)
-    new RichProcess(process, logger)
-  }
-
-  /**
-   * hostList format: host1:port1,host2:port2,host3:port3...
-   */
-  def parseHostList(hostList: String): List[HostPort] = {
-    val masters = hostList.trim.split(",").map { address =>
-      val hostAndPort = address.split(":")
-      HostPort(hostAndPort(0), hostAndPort(1).toInt)
-    }
-    masters.toList
-  }
-
-  def resolvePath(path: String): String = {
-    val uri = new URI(path)
-    if (uri.getScheme == null && uri.getFragment == null) {
-      val absolutePath = new File(path).getCanonicalPath.replaceAll("\\\\", "/")
-      "file://" + absolutePath
-    } else {
-      path
-    }
-  }
-
-  def isLocalPath(path: String): Boolean = {
-    val uri = new URI(path)
-    val scheme = uri.getScheme
-    val authority = uri.getAuthority
-    if (scheme == null && authority == null) {
-      true
-    } else if (scheme == defaultUri.getScheme) {
-      true
-    } else {
-      false
-    }
-  }
-
-  def randInt(): Int = {
-    Math.abs(ThreadLocalRandom.current.nextInt())
-  }
-
-  def findFreePort(): Try[Int] = {
-    Try {
-      val socket = new ServerSocket(0)
-      socket.setReuseAddress(true)
-      val port = socket.getLocalPort()
-      socket.close
-      port
-    }
-  }
-
-  def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = {
-    val remotePath = jarStoreService.copyFromLocal(jarFile)
-    AppJar(jarFile.getName, remotePath)
-  }
-
-  /**
-   * This util can be used to filter out configuration from specific origin
-   *
-   * For example, if you want to filter out configuration from reference.conf
-   * Then you can use like this:
-   *
-   * filterOutOrigin(config, "reference.conf")
-   */
-  import scala.collection.JavaConverters._
-  def filterOutOrigin(config: Config, originFile: String): Config = {
-    config.entrySet().asScala.foldLeft(ConfigFactory.empty()) { (config, entry) =>
-      val key = entry.getKey
-      val value = entry.getValue
-      val origin = value.origin()
-      if (origin.resource() == originFile) {
-        config
-      } else {
-        config.withValue(key, value)
-      }
-    }
-  }
-
-  case class JvmSetting(vmargs: Array[String], classPath: Array[String])
-
-  case class AppJvmSettings(appMater: JvmSetting, executor: JvmSetting)
-
-  /** Get an effective AppJvmSettings from Config */
-  def resolveJvmSetting(conf: Config): AppJvmSettings = {
-
-    import io.gearpump.util.Constants._
-
-    val appMasterVMArgs = Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+")
-      .filter(_.nonEmpty)).toOption
-    val executorVMArgs = Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+")
-      .filter(_.nonEmpty)).toOption
-
-    val appMasterClassPath = Try(
-      conf.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
-        .split("[;:]").filter(_.nonEmpty)).toOption
-
-    val executorClassPath = Try(
-      conf.getString(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
-        .split(File.pathSeparator).filter(_.nonEmpty)).toOption
-
-    AppJvmSettings(
-      JvmSetting(appMasterVMArgs.getOrElse(Array.empty[String]),
-        appMasterClassPath.getOrElse(Array.empty[String])),
-      JvmSetting(executorVMArgs
-        .getOrElse(Array.empty[String]), executorClassPath.getOrElse(Array.empty[String])))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala
new file mode 100644
index 0000000..871ebe1
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump
+
+/**
+ * Each message contains a immutable timestamp.
+ *
+ * For example, if you take a picture, the time you take the picture is the
+ * message's timestamp.
+ * @param msg Accept any type except Null, Nothing and Unit
+ */
+case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp)
+
+object Message {
+  val noTimeStamp: TimeStamp = 0L
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
new file mode 100644
index 0000000..91c2675
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import scala.reflect.ClassTag
+
+import akka.actor.{Actor, ActorRef, ActorSystem}
+import com.typesafe.config.{Config, ConfigFactory}
+
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.jarstore.FilePath
+
+/**
+ * This contains all information to run an application
+ *
+ * @param name The name of this application
+ * @param appMaster The class name of AppMaster Actor
+ * @param userConfig user configuration.
+ * @param clusterConfig User provided cluster config, it overrides gear.conf when starting
+ *                      new applications. In most cases, you should not need to change it. If you do
+ *                      really need to change it, please use ClusterConfigSource(filePath) to
+ *                      construct the object, while filePath points to the .conf file.
+ */
+case class AppDescription(
+    name: String, appMaster: String, userConfig: UserConfig,
+    clusterConfig: Config = ConfigFactory.empty())
+
+/**
+ * Each job, streaming or not streaming, need to provide an Application class.
+ * The master uses this class to start AppMaster.
+ */
+trait Application {
+
+  /** Name of this application, must be unique in the system */
+  def name: String
+
+  /** Custom user configuration  */
+  def userConfig(implicit system: ActorSystem): UserConfig
+
+  /**
+   * AppMaster class, must have a constructor like this:
+   * this(appContext: AppMasterContext, app: AppDescription)
+   */
+  def appMaster: Class[_ <: ApplicationMaster]
+}
+
+object Application {
+  def apply[T <: ApplicationMaster](
+      name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = {
+    new DefaultApplication(name, userConfig,
+      tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]])
+  }
+
+  class DefaultApplication(
+      override val name: String, inputUserConfig: UserConfig,
+      val appMaster: Class[_ <: ApplicationMaster]) extends Application {
+    override def userConfig(implicit system: ActorSystem): UserConfig = inputUserConfig
+  }
+
+  def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem)
+    : AppDescription = {
+    val filterJvmReservedKeys = ClusterConfig.filterOutDefaultConfig(system.settings.config)
+    AppDescription(app.name, app.appMaster.getName, app.userConfig, filterJvmReservedKeys)
+  }
+}
+
+/**
+ * Used for verification. All AppMaster must extend this interface
+ */
+abstract class ApplicationMaster extends Actor
+
+/**
+ * This contains context information when starting an AppMaster
+ *
+ * @param appId application instance id assigned, it is unique in the cluster
+ * @param username The username who submitted this application
+ * @param resource Resouce allocated to start this AppMaster daemon. AppMaster are allowed to
+ *                 request more resource from Master.
+ * @param appJar application Jar. If the jar is already in classpath, then it can be None.
+ * @param masterProxy The proxy to master actor, it bridges the messages between appmaster
+ *                    and master
+ * @param registerData AppMaster are required to send this data to Master by when doing
+ *                     RegisterAppMaster.
+ */
+case class AppMasterContext(
+    appId: Int,
+    username: String,
+    resource: Resource,
+    workerInfo: WorkerInfo,
+    appJar: Option[AppJar],
+    masterProxy: ActorRef,
+    registerData: AppMasterRegisterData)
+
+/**
+ * Jar file container in the cluster
+ *
+ * @param name A meaningful name to represent this jar
+ * @param filePath Where the jar file is stored.
+ */
+case class AppJar(name: String, filePath: FilePath)
+
+/**
+ * Serves as the context to start an Executor JVM.
+ */
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorContext(
+    executorId: Int, worker: WorkerInfo, appId: Int, appName: String,
+    appMaster: ActorRef, resource: Resource)
+
+/**
+ * JVM configurations to start an Executor JVM.
+ *
+ * @param classPath When executor is created by a worker JVM, executor automatically inherits
+ *                  parent worker's classpath. Sometimes, you still want to add some extra
+ *                  classpath, you can do this by specify classPath option.
+ * @param jvmArguments java arguments like -Dxx=yy
+ * @param mainClass Executor main class name like org.apache.gearpump.xx.AppMaster
+ * @param arguments Executor command line arguments
+ * @param jar application jar
+ * @param executorAkkaConfig Akka config used to initialize the actor system of this executor.
+ *                           It uses org.apache.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE
+ *                           to pass the config to executor process
+ */
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorJVMConfig(
+    classPath: Array[String], jvmArguments: Array[String], mainClass: String,
+    arguments: Array[String], jar: Option[AppJar], username: String,
+    executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala
new file mode 100644
index 0000000..332e770
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import java.io.File
+
+import com.typesafe.config._
+
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, FileUtils, LogUtil, Util}
+
+/**
+ *
+ * All Gearpump application should use this class to load configurations.
+ *
+ * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also
+ * resolve config from file gear.conf and geardefault.conf.
+ *
+ * Overriding order:
+ * {{{
+ *   System Properties
+ *     > Custom configuration file (by using system property -Dgearpump.config.file) >
+ *     > gear.conf
+ *     > geardefault.conf
+ *     > reference.conf
+ * }}}
+ */
+
+object ClusterConfig {
+  /**
+   * alias for default
+   * default is a reserved word for java
+   */
+  def defaultConfig: Config = {
+    default(APPLICATION)
+  }
+
+  /**
+   * default application for user.
+   * Usually used when user want to start an client application.
+   */
+  def default(configFile: String = APPLICATION): Config = {
+    load(configFile).default
+  }
+
+  /**
+   * configuration for master node
+   */
+  def master(configFile: String = null): Config = {
+    load(configFile).master
+  }
+
+  /*
+   * configuration for worker node
+   */
+  def worker(configFile: String = null): Config = {
+    load(configFile).worker
+  }
+
+  /**
+   * configuration for UI server
+   */
+  def ui(configFile: String = null): Config = {
+    load(configFile).ui
+  }
+
+  /**
+   * try to load system property gearpump.config.file, or use configFile
+   */
+  private def load(configFile: String): Configs = {
+    val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE))
+    file match {
+      case Some(path) =>
+        LOG.info("loading config file " + path + "..........")
+        load(ClusterConfigSource(path))
+      case None =>
+        LOG.info("loading config file application.conf...")
+        load(ClusterConfigSource(configFile))
+    }
+  }
+
+  val APPLICATION = "application.conf"
+  val LOG = LogUtil.getLogger(getClass)
+
+  def saveConfig(conf: Config, file: File): Unit = {
+    val serialized = conf.root().render()
+    FileUtils.write(file, serialized)
+  }
+
+  def render(config: Config, concise: Boolean = false): String = {
+    if (concise) {
+      config.root().render(ConfigRenderOptions.concise().setFormatted(true))
+    } else {
+      config.root().render(ConfigRenderOptions.defaults())
+    }
+  }
+
+  /** filter JVM reserved keys and akka default reference.conf */
+  def filterOutDefaultConfig(input: Config): Config = {
+    val updated = filterOutJvmReservedKeys(input)
+    Util.filterOutOrigin(updated, "reference.conf")
+  }
+
+  private[gearpump] def load(source: ClusterConfigSource): Configs = {
+
+    val systemProperties = getSystemProperties
+
+    val user = source.getConfig
+
+    val gear = ConfigFactory.parseResourcesAnySyntax("gear.conf",
+      ConfigParseOptions.defaults.setAllowMissing(true))
+
+    val gearDefault = ConfigFactory.parseResourcesAnySyntax("geardefault.conf",
+      ConfigParseOptions.defaults.setAllowMissing(true))
+
+    val all = systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault)
+
+    val linux = all.getConfig(LINUX_CONFIG)
+
+    var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG).
+      withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG)
+
+    if (!akka.util.Helpers.isWindows) {
+
+      // Change the akka.scheduler.tick-duration to 1 ms for Linux or Mac
+      basic = linux.withFallback(basic)
+    }
+
+    val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic))
+    val worker = replaceHost(all.getConfig(WORKER_CONFIG).withFallback(basic))
+    val ui = replaceHost(all.getConfig(UI_CONFIG).withFallback(basic))
+    val app = replaceHost(basic)
+
+    new Configs(master, worker, ui, app)
+  }
+
+  private def replaceHost(config: Config): Config = {
+    val hostName = config.getString(Constants.GEARPUMP_HOSTNAME)
+    config.withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(hostName))
+  }
+
+  val JVM_RESERVED_PROPERTIES = List(
+    "os", "java", "sun", "boot", "user", "prog", "path", "line", "awt", "file"
+  )
+
+  private def getSystemProperties: Config = {
+    // Excludes default java system properties
+    JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) =>
+      config.withoutPath(property)
+    }
+  }
+
+  class ConfigValidationException(msg: String) extends Exception(msg: String)
+
+  private def filterOutJvmReservedKeys(input: Config): Config = {
+    val filterJvmReservedKeys = JVM_RESERVED_PROPERTIES.foldLeft(input) { (config, key) =>
+      config.withoutPath(key)
+    }
+    filterJvmReservedKeys
+  }
+
+  protected class Configs(
+      val master: Config, val worker: Config, val ui: Config, val default: Config)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala
new file mode 100644
index 0000000..920ceae
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import java.io.File
+import scala.language.implicitConversions
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+
+/**
+ * Data Source of ClusterConfig
+ *
+ * Please use ClusterConfigSource.apply(filePath) to construct this object
+ */
+sealed trait ClusterConfigSource extends Serializable {
+  def getConfig: Config
+}
+
+object ClusterConfigSource {
+
+  /**
+   * Construct ClusterConfigSource from resource name or file path
+   */
+  def apply(filePath: String): ClusterConfigSource = {
+
+    if (null == filePath) {
+      new ClusterConfigSourceImpl(ConfigFactory.empty())
+    } else {
+      var config = ConfigFactory.parseFileAnySyntax(new File(filePath),
+        ConfigParseOptions.defaults.setAllowMissing(true))
+
+      if (null == config || config.isEmpty) {
+        config = ConfigFactory.parseResourcesAnySyntax(filePath,
+          ConfigParseOptions.defaults.setAllowMissing(true))
+      }
+      new ClusterConfigSourceImpl(config)
+    }
+  }
+
+  implicit def FilePathToClusterConfigSource(filePath: String): ClusterConfigSource = {
+    apply(filePath)
+  }
+
+  private class ClusterConfigSourceImpl(config: Config) extends ClusterConfigSource {
+    override def getConfig: Config = config
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
new file mode 100644
index 0000000..e6ab13b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import org.apache.gearpump.cluster.worker.{WorkerSummary, WorkerId}
+
+import scala.util.Try
+
+import akka.actor.ActorRef
+import com.typesafe.config.Config
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus
+import org.apache.gearpump.cluster.master.{MasterNode, MasterSummary}
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.metrics.Metrics.MetricType
+
+object ClientToMaster {
+  case object AddMaster
+  case class AddWorker(count: Int)
+  case class RemoveMaster(masterContainerId: String)
+  case class RemoveWorker(workerContainerId: String)
+
+  /** Command result of AddMaster, RemoveMaster, and etc... */
+  case class CommandResult(success: Boolean, exception: String = null) {
+    override def toString: String = {
+      val tag = getClass.getSimpleName
+      if (success) {
+        s"$tag(success)"
+      } else {
+        s"$tag(failure, $exception)"
+      }
+    }
+  }
+
+  /** Submit an application to master */
+  case class SubmitApplication(
+      appDescription: AppDescription, appJar: Option[AppJar],
+      username: String = System.getProperty("user.name"))
+
+  case class RestartApplication(appId: Int)
+  case class ShutdownApplication(appId: Int)
+
+  /** Client send ResolveAppId to Master to resolves AppMaster actor path by providing appId */
+  case class ResolveAppId(appId: Int)
+
+  /** Client send ResolveWorkerId to master to get the Actor path of worker. */
+  case class ResolveWorkerId(workerId: WorkerId)
+
+  /** Get an active Jar store to upload job jars, like wordcount.jar */
+  case object GetJarStoreServer
+
+  /** Service address of JarStore */
+  case class JarStoreServerAddress(url: String)
+
+  /** Query AppMaster config by providing appId */
+  case class QueryAppMasterConfig(appId: Int)
+
+  /** Query worker config */
+  case class QueryWorkerConfig(workerId: WorkerId)
+
+  /** Query master config */
+  case object QueryMasterConfig
+
+  /** Options for read the metrics from the cluster */
+  object ReadOption {
+    type ReadOption = String
+
+    val Key: String = "readOption"
+
+    /** Read the latest record of the metrics, only return 1 record for one metric name (id) */
+    val ReadLatest: ReadOption = "readLatest"
+
+    /** Read recent metrics from cluster, typically it contains metrics in 5 minutes */
+    val ReadRecent = "readRecent"
+
+    /**
+     * Read the history metrics, typically it contains metrics for 48 hours
+     *
+     * NOTE: Each hour only contain one or two data points.
+     */
+    val ReadHistory = "readHistory"
+  }
+
+  /** Query history metrics from master or app master. */
+  case class QueryHistoryMetrics(
+      path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest,
+      aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String])
+
+  /**
+   * If there are message loss, the clock would pause for a while. This message is used to
+   * pin-point which task has stalling clock value, and usually it means something wrong on
+   * that machine.
+   */
+  case class GetStallingTasks(appId: Int)
+
+  /**
+   * Request app master for a short list of cluster app that administrators should be aware of.
+   */
+  case class GetLastFailure(appId: Int)
+}
+
+object MasterToClient {
+
+  /** Result of SubmitApplication */
+  // TODO: Merge with SubmitApplicationResultValue and change this to (appId: Option, ex: Exception)
+  case class SubmitApplicationResult(appId: Try[Int])
+
+  case class SubmitApplicationResultValue(appId: Int)
+
+  case class ShutdownApplicationResult(appId: Try[Int])
+  case class ReplayApplicationResult(appId: Try[Int])
+
+  /** Return Actor ref of app master */
+  case class ResolveAppIdResult(appMaster: Try[ActorRef])
+
+  /** Return Actor ref of worker */
+  case class ResolveWorkerIdResult(worker: Try[ActorRef])
+
+  case class AppMasterConfig(config: Config)
+
+  case class WorkerConfig(config: Config)
+
+  case class MasterConfig(config: Config)
+
+  case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
+
+  /**
+   * History metrics returned from master, worker, or app master.
+   *
+   * All metric items are organized like a tree, path is used to navigate through the tree.
+   * For example, when querying with path == "executor0.task1.throughput*", the metrics
+   * provider picks metrics whose source matches the path.
+   *
+   * @param path The path client provided. The returned metrics are the result query of this path.
+   * @param metrics The detailed metrics.
+   */
+  case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
+
+  /** Return the last error of this streaming application job */
+  case class LastFailure(time: TimeStamp, error: String)
+}
+
+trait AppMasterRegisterData
+
+object AppMasterToMaster {
+
+  /**
+   * Register an AppMaster by providing a ActorRef, and registerData
+   * @param registerData The registerData is provided by Master when starting the app master.
+   *                     App master should return the registerData back to master.
+   *                     Typically registerData hold some context information for this app Master.
+   */
+
+  case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData)
+
+  case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable)
+
+  case class RequestResource(appId: Int, request: ResourceRequest)
+
+  /**
+   * Each application job can save some data in the distributed cluster storage on master nodes.
+   *
+   * @param appId App Id of the client application who send the request.
+   * @param key Key name
+   * @param value Value to store on distributed cluster storage on master nodes
+   */
+  case class SaveAppData(appId: Int, key: String, value: Any)
+
+  /** The application specific data is successfully stored */
+  case object AppDataSaved
+
+  /** Fail to store the application data */
+  case object SaveAppDataFailed
+
+  /** Fetch the application specific data that stored previously */
+  case class GetAppData(appId: Int, key: String)
+
+  /** The KV data returned for query GetAppData */
+  case class GetAppDataResult(key: String, value: Any)
+
+  /**
+   * AppMasterSummary returned to REST API query. Streaming and Non-streaming
+   * have very different application info. AppMasterSummary is the common interface.
+   */
+  trait AppMasterSummary {
+    def appType: String
+    def appId: Int
+    def appName: String
+    def actorPath: String
+    def status: AppMasterStatus
+    def startTime: TimeStamp
+    def uptime: TimeStamp
+    def user: String
+  }
+
+  /** Represents a generic application that is not a streaming job */
+  case class GeneralAppMasterSummary(
+      appId: Int,
+      appType: String = "general",
+      appName: String = null,
+      actorPath: String = null,
+      status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+      startTime: TimeStamp = 0L,
+      uptime: TimeStamp = 0L,
+      user: String = null)
+    extends AppMasterSummary
+
+  /** Fetches the list of workers from Master */
+  case object GetAllWorkers
+
+  /** Get worker data of workerId */
+  case class GetWorkerData(workerId: WorkerId)
+
+  /** Response to GetWorkerData */
+  case class WorkerData(workerDescription: WorkerSummary)
+
+  /** Get Master data */
+  case object GetMasterData
+
+  /** Response to GetMasterData */
+  case class MasterData(masterDescription: MasterSummary)
+}
+
+object MasterToAppMaster {
+
+  /** Resource allocated for application xx */
+  case class ResourceAllocated(allocations: Array[ResourceAllocation])
+
+  /** Master confirm reception of RegisterAppMaster message */
+  case class AppMasterRegistered(appId: Int)
+
+  /** Shutdown the application job */
+  case object ShutdownAppMaster
+
+  type AppMasterStatus = String
+  val AppMasterActive: AppMasterStatus = "active"
+  val AppMasterInActive: AppMasterStatus = "inactive"
+  val AppMasterNonExist: AppMasterStatus = "nonexist"
+
+  sealed trait StreamingType
+  case class AppMasterData(
+      status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null,
+      workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0,
+      finishTime: TimeStamp = 0, user: String = null)
+
+  case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
+
+  case class AppMastersData(appMasters: List[AppMasterData])
+  case object AppMastersDataRequest
+  case class AppMasterDataDetailRequest(appId: Int)
+  case class AppMasterMetricsRequest(appId: Int) extends StreamingType
+
+  case class ReplayFromTimestampWindowTrailingEdge(appId: Int)
+
+  case class WorkerList(workers: List[WorkerId])
+}
+
+object AppMasterToWorker {
+  case class LaunchExecutor(
+      appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig)
+
+  case class ShutdownExecutor(appId: Int, executorId: Int, reason: String)
+  case class ChangeExecutorResource(appId: Int, executorId: Int, resource: Resource)
+}
+
+object WorkerToAppMaster {
+  case class ExecutorLaunchRejected(reason: String = null, ex: Throwable = null)
+  case class ShutdownExecutorSucceed(appId: Int, executorId: Int)
+  case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null)
+}
+