You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:55 UTC
[45/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala
deleted file mode 100644
index 342cd87..0000000
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util.concurrent.TimeUnit
-
-import io.gearpump.partitioner._
-
-object Constants {
- val MASTER_WATCHER = "masterwatcher"
- val SINGLETON_MANAGER = "singleton"
-
- val MASTER_CONFIG = "gearpump-master"
- val WORKER_CONFIG = "gearpump-worker"
- val UI_CONFIG = "gearpump-ui"
- val LINUX_CONFIG = "gearpump-linux" // linux or Mac
-
- val MASTER = "master"
- val WORKER = "worker"
-
- val GEARPUMP_WORKER_SLOTS = "gearpump.worker.slots"
- val GEARPUMP_EXECUTOR_PROCESS_LAUNCHER = "gearpump.worker.executor-process-launcher"
- val GEARPUMP_SCHEDULING_SCHEDULER = "gearpump.scheduling.scheduler-class"
- val GEARPUMP_SCHEDULING_REQUEST = "gearpump.scheduling.requests"
- val GEARPUMP_TRANSPORT_SERIALIZER = "gearpump.transport.serializer"
- val GEARPUMP_SERIALIZER_POOL = "gearpump.serialization-framework"
- val GEARPUMP_SERIALIZERS = "gearpump.serializers"
- val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher"
- val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters"
- val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout"
- val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS =
- "gearpump.worker.executor-share-same-jvm-as-worker"
-
- val GEARPUMP_HOME = "gearpump.home"
- val GEARPUMP_FULL_SCALA_VERSION = "gearpump.binary-version-with-scala-version"
- val GEARPUMP_HOSTNAME = "gearpump.hostname"
- val GEARPUMP_APPMASTER_ARGS = "gearpump.appmaster.vmargs"
- val GEARPUMP_APPMASTER_EXTRA_CLASSPATH = "gearpump.appmaster.extraClasspath"
- val GEARPUMP_EXECUTOR_ARGS = "gearpump.executor.vmargs"
- val GEARPUMP_EXECUTOR_EXTRA_CLASSPATH = "gearpump.executor.extraClasspath"
- val GEARPUMP_LOG_DAEMON_DIR = "gearpump.log.daemon.dir"
- val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir"
- val HADOOP_CONF = "hadoopConf"
-
- // Id used to identity Master JVM process in low level resource manager like YARN.
- // In YARN, it means the container Id.
- val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID =
- "gearpump.master-resource-manager-container-id"
-
- // Id used to identity Worker JVM process in low level resource manager like YARN.
- // In YARN, it means the container Id.
- val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID =
- "gearpump.worker-resource-manager-container-id"
-
- // true or false
- val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm"
- val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port"
-
- // Whether to turn on GC log, true or false
- val GEARPUMP_VERBOSE_GC = "gearpump.verbose-gc"
-
- // The time out for Future, like ask.
- // !Important! This global timeout setting will also impact the UI
- // responsive time if set to too big. Please make sure you have
- // enough justification to change this global setting, otherwise
- // please use your local timeout setting instead.
- val FUTURE_TIMEOUT = akka.util.Timeout(15, TimeUnit.SECONDS)
-
- val GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS = "gearpump.start-executor-system-timeout-ms"
-
- val APPMASTER_DEFAULT_EXECUTOR_ID = -1
-
- val NETTY_BUFFER_SIZE = "gearpump.netty.buffer-size"
- val NETTY_MAX_RETRIES = "gearpump.netty.max-retries"
- val NETTY_BASE_SLEEP_MS = "gearpump.netty.base-sleep-ms"
- val NETTY_MAX_SLEEP_MS = "gearpump.netty.max-sleep-ms"
- val NETTY_MESSAGE_BATCH_SIZE = "gearpump.netty.message-batch-size"
- val NETTY_FLUSH_CHECK_INTERVAL = "gearpump.netty.flush-check-interval"
- val NETTY_TCP_HOSTNAME = "akka.remote.netty.tcp.hostname"
- val NETTY_DISPATCHER = "gearpump.netty.dispatcher"
-
- val GEARPUMP_USERNAME = "gearpump.username"
- val GEARPUMP_APPLICATION_ID = "gearpump.applicationId"
- val GEARPUMP_MASTER_STARTTIME = "gearpump.master.starttime"
- val GEARPUMP_EXECUTOR_ID = "gearpump.executorId"
- // Application jar property
- val GEARPUMP_APP_JAR = "gearpump.app.jar"
- val GEARPUMP_APP_NAME_PREFIX = "gearpump.app.name.prefix"
-
- // Where the jar is stored at. It can be a HDFS, or a local disk.
- val GEARPUMP_APP_JAR_STORE_ROOT_PATH = "gearpump.jarstore.rootpath"
-
- // Uses java property -Dgearpump.config.file=xxx.conf to set customized configuration
- // Otherwise application.conf in classpath will be loaded
- val GEARPUMP_CUSTOM_CONFIG_FILE = "gearpump.config.file"
-
- // Metrics related
- val GEARPUMP_METRIC_ENABLED = "gearpump.metrics.enabled"
- val GEARPUMP_METRIC_SAMPLE_RATE = "gearpump.metrics.sample-rate"
- val GEARPUMP_METRIC_REPORT_INTERVAL = "gearpump.metrics.report-interval-ms"
- val GEARPUMP_METRIC_GRAPHITE_HOST = "gearpump.metrics.graphite.host"
- val GEARPUMP_METRIC_GRAPHITE_PORT = "gearpump.metrics.graphite.port"
- val GEARPUMP_METRIC_REPORTER = "gearpump.metrics.reporter"
-
- // Retains at max @RETAIN_HISTORY_HOURS history data
- val GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS = "gearpump.metrics.retainHistoryData.hours"
-
- // Time interval between two history data points.
- val GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS = "gearpump.metrics.retainHistoryData.intervalMs"
-
- // Retains at max @RETAIN_LATEST_SECONDS recent data points
- val GEARPUMP_RETAIN_RECENT_DATA_SECONDS = "gearpump.metrics.retainRecentData.seconds"
-
- // time interval between two recent data points.
- val GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS = "gearpump.metrics.retainRecentData.intervalMs"
-
- // AppMaster will max wait this time until it declare the resource cannot be allocated,
- // and shutdown itself
- val GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT = "gearpump.resource-allocation-timeout-seconds"
-
- // Service related
- val GEARPUMP_SERVICE_HTTP = "gearpump.services.http"
- val GEARPUMP_SERVICE_HOST = "gearpump.services.host"
- val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path"
- val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise"
-
- // The partitioners provided by Gearpump
- val BUILTIN_PARTITIONERS = Array(
- classOf[BroadcastPartitioner],
- classOf[CoLocationPartitioner],
- classOf[HashPartitioner],
- classOf[ShuffleGroupingPartitioner],
- classOf[ShufflePartitioner])
-
- // Security related
- val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file"
- val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal"
-
- val GEARPUMP_METRICS_MAX_LIMIT = "gearpump.metrics.akka.max-limit-on-query"
- val GEARPUMP_METRICS_AGGREGATORS = "gearpump.metrics.akka.metrics-aggregator-class"
-
- val GEARPUMP_UI_SECURITY = "gearpump.ui-security"
- val GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED = "gearpump.ui-security.authentication-enabled"
- val GEARPUMP_UI_AUTHENTICATOR_CLASS = "gearpump.ui-security.authenticator"
- // OAuth Authentication Factory for UI server.
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED = "gearpump.ui-security.oauth2-authenticator-enabled"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATORS = "gearpump.ui-security.oauth2-authenticators"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS = "class"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK = "callback"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID = "clientid"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET = "clientsecret"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE = "default-userrole"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE = "code"
- val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN = "accesstoken"
-
- val PREFER_IPV4 = "java.net.preferIPv4Stack"
-
- val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num"
-
- val AKKA_SCHEDULER_TICK_DURATION = "akka.scheduler.tick-duration"
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/FileUtils.scala b/core/src/main/scala/io/gearpump/util/FileUtils.scala
deleted file mode 100644
index 1561587..0000000
--- a/core/src/main/scala/io/gearpump/util/FileUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.{File, IOException}
-import java.nio.charset.Charset
-
-import io.gearpump.google.common.io.Files
-
-object FileUtils {
- private val UTF8 = Charset.forName("UTF-8")
-
- def write(file: File, str: String): Unit = {
- Files.write(str, file, UTF8)
- }
-
- def read(file: File): String = {
- Files.asCharSource(file, UTF8).read()
- }
-
- def writeByteArrayToFile(file: File, bytes: Array[Byte]): Unit = {
- Files.write(bytes, file)
- }
-
- def readFileToByteArray(file: File): Array[Byte] = {
- Files.toByteArray(file)
- }
-
- /** recursively making all parent directories including itself */
- def forceMkdir(directory: File): Unit = {
- if (directory.exists() && directory.isFile) {
- throw new IOException(s"Failed to create directory ${directory.toString}, it already exist")
- }
- Files.createParentDirs(directory)
- val result = directory.mkdir()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Graph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Graph.scala b/core/src/main/scala/io/gearpump/util/Graph.scala
deleted file mode 100644
index 8c34329..0000000
--- a/core/src/main/scala/io/gearpump/util/Graph.scala
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-import scala.annotation.tailrec
-import scala.collection.mutable
-import scala.language.implicitConversions
-
-/**
- * Generic mutable Graph libraries.
- */
-class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable {
-
- private val _vertices = mutable.Set.empty[N]
- private val _edges = mutable.Set.empty[(N, E, N)]
-
- // This is used to ensure the output of this Graph is always stable
- // Like method vertices(), or edges()
- private var _indexs = Map.empty[Any, Int]
- private var _nextIndex = 0
- private def nextId: Int = {
- val result = _nextIndex
- _nextIndex += 1
- result
- }
-
- private def init(): Unit = {
- Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_))
- Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_))
- }
-
- init()
-
- /**
- * Add a vertex
- * Current Graph is changed.
- */
- def addVertex(vertex: N): Unit = {
- val result = _vertices.add(vertex)
- if (result) {
- _indexs += vertex -> nextId
- }
- }
-
- /**
- * Add a edge
- * Current Graph is changed.
- */
- def addEdge(edge: (N, E, N)): Unit = {
- val result = _edges.add(edge)
- if (result) {
- _indexs += edge -> nextId
- }
- }
-
- /**
- * return all vertices.
- * The result is stable
- */
- def vertices: List[N] = {
- // Sorts the vertex so that we can keep the order for mapVertex
- _vertices.toList.sortBy(_indexs(_))
- }
-
- /**
- * out degree
- */
- def outDegreeOf(node: N): Int = {
- edges.count(_._1 == node)
- }
-
- /**
- * in degree
- */
- def inDegreeOf(node: N): Int = {
- edges.count(_._3 == node)
- }
-
- /**
- * out going edges.
- */
- def outgoingEdgesOf(node: N): List[(N, E, N)] = {
- edges.filter(_._1 == node)
- }
-
- /**
- * incoming edges.
- */
- def incomingEdgesOf(node: N): List[(N, E, N)] = {
- edges.filter(_._3 == node)
- }
-
- /**
- * Remove vertex
- * Current Graph is changed.
- */
- def removeVertex(node: N): Unit = {
- _vertices.remove(node)
- _indexs -= node
- val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node)
- toBeRemoved.foreach(removeEdge(_))
- }
-
- /**
- * Remove edge
- * Current Graph is changed.
- */
- private def removeEdge(edge: (N, E, N)): Unit = {
- _indexs -= edge
- _edges.remove(edge)
- }
-
- /**
- * add edge
- * Current Graph is changed.
- */
- def addEdge(node1: N, edge: E, node2: N): Unit = {
- addVertex(node1)
- addVertex(node2)
- addEdge((node1, edge, node2))
- }
-
- /**
- * Map a graph to a new graph, with vertex converted to a new type
- * Current Graph is not changed.
- */
- def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = {
- val vertexes = vertices.map(node => (node, fun(node)))
-
- val vertexMap: Map[N, NewNode] = vertexes.toMap
-
- val newEdges = edges.map { edge =>
- (vertexMap(edge._1), edge._2, vertexMap(edge._3))
- }
- new Graph(vertexes.map(_._2), newEdges)
- }
-
- /**
- * Map a graph to a new graph, with edge converted to new type
- * Current graph is not changed.
- */
- def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = {
- val newEdges = edges.map { edge =>
- (edge._1, fun(edge._1, edge._2, edge._3), edge._3)
- }
- new Graph(vertices, newEdges)
- }
-
- /**
- * edges connected to node
- */
- def edgesOf(node: N): List[(N, E, N)] = {
- (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_))
- }
-
- /**
- * all edges
- */
- def edges: List[(N, E, N)] = {
- _edges.toList.sortBy(_indexs(_))
- }
-
- /**
- * Add another graph
- * Current graph is changed.
- */
- def addGraph(other: Graph[N, E]): Graph[N, E] = {
- (vertices ++ other.vertices).foreach(addVertex(_))
- (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
- this
- }
-
- /**
- * clone the graph
- */
- def copy: Graph[N, E] = {
- new Graph(vertices, edges)
- }
-
- /**
- * check empty
- */
- def isEmpty: Boolean = {
- val vertexCount = vertices.size
- val edgeCount = edges.length
- if (vertexCount + edgeCount == 0) {
- true
- } else {
- false
- }
- }
-
- /**
- * sub-graph which contains current node and all neighbour
- * nodes and edges.
- *
- */
- def subGraph(node: N): Graph[N, E] = {
- val newGraph = Graph.empty[N, E]
- for (edge <- edgesOf(node)) {
- newGraph.addEdge(edge._1, edge._2, edge._3)
- }
- newGraph
- }
-
- /**
- * replace vertex, the current Graph is mutated.
- */
- def replaceVertex(node: N, newNode: N): Graph[N, E] = {
- for (edge <- incomingEdgesOf(node)) {
- addEdge(edge._1, edge._2, newNode)
- }
-
- for (edge <- outgoingEdgesOf(node)) {
- addEdge(newNode, edge._2, edge._3)
- }
- removeVertex(node)
- this
- }
-
- private def removeZeroInDegree: List[N] = {
- val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_))
- toBeRemoved.foreach(removeVertex(_))
- toBeRemoved
- }
-
- /**
- * Return an iterator of vertex in topological order
- * The node returned by Iterator is stable sorted.
- */
- def topologicalOrderIterator: Iterator[N] = {
- val newGraph = copy
- var output = List.empty[N]
-
- while (!newGraph.isEmpty) {
- output ++= newGraph.removeZeroInDegree
- }
- output.iterator
- }
-
- /**
- * Return all circles in graph.
- *
- * The reference of this algorithm is:
- * https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
- */
- private def findCircles: mutable.MutableList[mutable.MutableList[N]] = {
- val inStack = mutable.Map.empty[N, Boolean]
- val stack = mutable.Stack[N]()
- val indexMap = mutable.Map.empty[N, Int]
- val lowLink = mutable.Map.empty[N, Int]
- var index = 0
-
- val circles = mutable.MutableList.empty[mutable.MutableList[N]]
-
- def tarjan(node: N): Unit = {
- indexMap(node) = index
- lowLink(node) = index
- index += 1
- inStack(node) = true
- stack.push(node)
-
- outgoingEdgesOf(node).foreach {
- edge => {
- if (!indexMap.contains(edge._3)) {
- tarjan(edge._3)
- if (lowLink.get(edge._3).get < lowLink.get(node).get) {
- lowLink(node) = lowLink(edge._3)
- }
- } else {
- if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) {
- lowLink(node) = indexMap(edge._3)
- }
- }
- }
- }
-
- if (indexMap.get(node).get == lowLink.get(node).get) {
- val circle = mutable.MutableList.empty[N]
- var n = node
- do {
- n = stack.pop()
- inStack(n) = false
- circle += n
- } while (n != node)
- circles += circle
- }
- }
-
- vertices.foreach {
- node => {
- if (!indexMap.contains(node)) tarjan(node)
- }
- }
-
- circles
- }
-
- /**
- * Return an iterator of vertex in topological order of graph with circles
- * The node returned by Iterator is stable sorted.
- *
- * The reference of this algorithm is:
- * http://www.drdobbs.com/database/topological-sorting/184410262
- */
- def topologicalOrderWithCirclesIterator: Iterator[N] = {
- val circles = findCircles
- val newGraph = Graph.empty[mutable.MutableList[N], E]
- circles.foreach {
- circle => {
- newGraph.addVertex(circle)
- }
- }
-
- for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield {
- for (node1 <- circle1; node2 <- circle2) yield {
- var edges = outgoingEdgesOf(node1)
- for (edge <- edges; if edge._3 == node2) yield {
- newGraph.addEdge(circle1, edge._2, circle2)
- }
-
- edges = outgoingEdgesOf(node2)
- for (edge <- edges; if edge._3 == node1) yield {
- newGraph.addEdge(circle2, edge._2, circle1)
- }
- }
- }
-
- val topo = newGraph.topologicalOrderIterator
- topo.flatMap(_.sortBy(_indexs(_)).iterator)
- }
-
- /**
- * check whether there is a loop
- */
- def hasCycle(): Boolean = {
- @tailrec
- def detectCycle(graph: Graph[N, E]): Boolean = {
- if (graph.edges.isEmpty) {
- false
- } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
- true
- } else {
- graph.removeZeroInDegree
- detectCycle(graph)
- }
- }
-
- detectCycle(copy)
- }
-
- /**
- * Check whether there are two edges connecting two nodes.
- */
- def hasDuplicatedEdge(): Boolean = {
- edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1)
- }
-
- /**
- * Generate a level map for each vertex withholding:
- * {{{
- * if vertex A -> B, then level(A) -> level(B)
- * }}}
- */
- def vertexHierarchyLevelMap(): Map[N, Int] = {
- val newGraph = copy
- var output = Map.empty[N, Int]
- var level = 0
- while (!newGraph.isEmpty) {
- output ++= newGraph.removeZeroInDegree.map((_, level)).toMap
- level += 1
- }
- output
- }
-
- override def toString: String = {
- Map("vertices" -> vertices.mkString(","),
- "edges" -> edges.mkString(",")).toString()
- }
-}
-
-object Graph {
-
- /**
- * Example:
- *
- * {{{
- * Graph(1 ~ 2 ~> 4 ~ 5 ~> 7, 8~9~>55, 11)
- * Will create a graph with:
- * nodes:
- * 1, 4, 7, 8, 55, 11
- * edge:
- * 2: (1->4)
- * 5: (4->7)
- * 9: (8->55)
- * }}}
- */
- def apply[N, E](elems: Path[_ <: N, _ <: E]*): Graph[N, E] = {
- val graph = empty[N, E]
- elems.foreach { path =>
- path.updategraph(graph)
- }
- graph
- }
-
- def apply[N, E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = {
- new Graph(vertices, edges)
- }
-
- def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = {
- Some((graph.vertices, graph.edges))
- }
-
- def empty[N, E]: Graph[N, E] = {
- new Graph(List.empty[N], List.empty[(N, E, N)])
- }
-
- class Path[N, + E](path: List[Either[N, E]]) {
-
- def ~[Edge >: E](edge: Edge): Path[N, Edge] = {
- new Path(path :+ Right(edge))
- }
-
- def ~>[Node >: N](node: Node): Path[Node, E] = {
- new Path(path :+ Left(node))
- }
-
- def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
- this ~ edge ~> node
- }
-
- private[Graph] def updategraph[Node >: N, Edge >: E](graph: Graph[Node, Edge]): Unit = {
- val nodeEdgePair: Tuple2[Option[N], Option[E]] = (None, None)
- path.foldLeft(nodeEdgePair) { (pair, either) =>
- val (lastNode, lastEdge) = pair
- either match {
- case Left(node) =>
- graph.addVertex(node)
- if (lastNode.isDefined) {
- graph.addEdge(lastNode.get, lastEdge.getOrElse(null.asInstanceOf[Edge]), node)
- }
- (Some(node), None)
- case Right(edge) =>
- (lastNode, Some(edge))
- }
- }
- }
- }
-
- object Path {
- implicit def anyToPath[N, E](any: N): Path[N, E] = Node(any)
- }
-
- implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) {
-
- override def ~[Edge](edge: Edge): Path[N, Edge] = {
- new Path(List(Left(self), Right(edge)))
- }
-
- override def ~>[Node >: N](node: Node): Path[Node, E] = {
- new NodeList(List(self, node))
- }
-
- override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
- this ~ edge ~> node
- }
- }
-
- class NodeList[N, E](nodes: List[N]) extends Path[N, E](nodes.map(Left(_))) {
- override def ~[Edge](edge: Edge): Path[N, Edge] = {
- new Path(nodes.map(Left(_)) :+ Right(edge))
- }
-
- override def ~>[Node >: N](node: Node): Path[Node, E] = {
- new NodeList(nodes :+ node)
- }
-
- override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
- this ~ edge ~> node
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
deleted file mode 100644
index 7552444..0000000
--- a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util
-import scala.collection.mutable.ListBuffer
-
-import akka.actor.Actor
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption}
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem}
-import io.gearpump.metrics.Metrics._
-import io.gearpump.metrics.MetricsAggregator
-import io.gearpump.util.Constants._
-import io.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator}
-
-/**
- *
- * Metrics service to serve history metrics data
- *
- * For simplicity, HistoryMetricsService will maintain 72 hours coarse-grained data
- * for last 72 hours, and fine-grained data for past 5 min.
- *
- * For the coarse-grained data of past 72 hours, one or two sample point will be stored
- * for each hour.
- *
- * For fine-grained data in last 5 min, there will be 1 sample point per 15 seconds.
- */
-class HistoryMetricsService(name: String, config: HistoryMetricsConfig) extends Actor {
- private val LOG: Logger = LogUtil.getLogger(getClass, name = name)
- private var metricsStore = Map.empty[String, HistoryMetricsStore]
- private val systemConfig = context.system.settings.config
-
- def receive: Receive = metricHandler orElse commandHandler
- def metricHandler: Receive = {
- case ReportMetrics =>
- sender ! DemandMoreMetrics(self)
- case metrics: MetricType =>
- val name = metrics.name
- if (metricsStore.contains(name)) {
- metricsStore(name).add(metrics)
- } else {
- val store = HistoryMetricsStore(name, metrics, config)
- metricsStore += name -> store
- store.add(metrics)
- }
- }
-
- private def toRegularExpression(input: String): String = {
- "^" + input.flatMap {
- case '*' => ".*"
- case '?' => "."
- case char if "()[]$^.{}|\\".contains(char) => "\\" + char
- case other => s"$other"
- } + ".*$"
- }
-
- private def fetchMetricsHistory(pathPattern: String, readOption: ReadOption.ReadOption)
- : List[HistoryMetricsItem] = {
-
- val result = new ListBuffer[HistoryMetricsItem]
-
- val regex = toRegularExpression(pathPattern).r.pattern
-
- val iter = metricsStore.iterator
- while (iter.hasNext) {
- val (name, store) = iter.next()
-
- val matcher = regex.matcher(name)
- if (matcher.matches()) {
- readOption match {
- case ReadOption.ReadLatest =>
- result.append(store.readLatest: _*)
- case ReadOption.ReadRecent =>
- result.append(store.readRecent: _*)
- case ReadOption.ReadHistory =>
- result.append(store.readHistory: _*)
- case _ =>
- // Skip all other options.
- }
- }
- }
- result.toList
- }
-
- val dummyAggregator = new DummyMetricsAggregator
- private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, MetricsAggregator]
-
- import scala.collection.JavaConverters._
- private val validAggregators: Set[String] = {
- val rootConfig = systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped
- rootConfig.keySet().asScala.toSet
- }
-
- def commandHandler: Receive = {
- // Path accept syntax ? *, ? will match one char, * will match at least one char
- case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) =>
-
- val aggregator = {
- if (aggregatorClazz == null || aggregatorClazz.isEmpty) {
- dummyAggregator
- } else if (aggregators.contains(aggregatorClazz)) {
- aggregators(aggregatorClazz)
- } else if (validAggregators.contains(aggregatorClazz)) {
- val clazz = Class.forName(aggregatorClazz)
- val constructor = clazz.getConstructor(classOf[Config])
- val aggregator = constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator]
- aggregators += aggregatorClazz -> aggregator
- aggregator
- } else {
- LOG.error(s"Aggregator $aggregatorClazz is not in the white list ${validAggregators}, " +
- s"we will drop all messages. Please see config at ${GEARPUMP_METRICS_AGGREGATORS}")
- val skipAll = new SkipAllAggregator
- aggregators += aggregatorClazz -> new SkipAllAggregator
- skipAll
- }
- }
-
- val metrics = fetchMetricsHistory(inputPath, readOption).iterator
- sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, metrics))
- }
-}
-
-object HistoryMetricsService {
-
- trait MetricsStore {
- def add(inputMetrics: MetricType): Unit
-
- def read: List[HistoryMetricsItem]
-
- /**
- * read latest inserted records
- * @return
- */
- def readLatest: List[HistoryMetricsItem]
- }
-
- trait HistoryMetricsStore {
- def add(inputMetrics: MetricType): Unit
-
- /**
- * read latest inserted records
- * @return
- */
- def readLatest: List[HistoryMetricsItem]
-
- def readRecent: List[HistoryMetricsItem]
-
- def readHistory: List[HistoryMetricsItem]
- }
-
- class DummyHistoryMetricsStore extends HistoryMetricsStore {
-
- val empty = List.empty[HistoryMetricsItem]
-
- override def add(inputMetrics: MetricType): Unit = Unit
-
- override def readRecent: List[HistoryMetricsItem] = empty
-
- /**
- * read latest inserted records
- * @return
- */
- override def readLatest: List[HistoryMetricsItem] = empty
-
- override def readHistory: List[HistoryMetricsItem] = empty
- }
-
- object HistoryMetricsStore {
- def apply(name: String, metric: MetricType, config: HistoryMetricsConfig)
- : HistoryMetricsStore = {
- metric match {
- case histogram: Histogram => new HistogramMetricsStore(config)
- case meter: Meter => new MeterMetricsStore(config)
- case counter: Counter => new CounterMetricsStore(config)
- case gauge: Gauge => new GaugeMetricsStore(config)
- case _ => new DummyHistoryMetricsStore // other metrics are not supported
- }
- }
- }
-
- /**
- * Metrics store to store history data points
- * For each time point, we will store single data point.
- *
- * @param retainCount how many data points to retain, old data will be removed
- * @param retainIntervalMs time interval between two data points.
- */
- class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) extends MetricsStore {
-
- private val queue = new util.ArrayDeque[HistoryMetricsItem]()
- private var latest = List.empty[HistoryMetricsItem]
-
- // End of the time window we are tracking
- private var endTime = 0L
-
- override def add(inputMetrics: MetricType): Unit = {
- add(inputMetrics, System.currentTimeMillis())
- }
-
- def add(inputMetrics: MetricType, now: TimeStamp): Unit = {
-
- val metrics = HistoryMetricsItem(now, inputMetrics)
- latest = List(metrics)
-
- if (now >= endTime) {
- queue.addFirst(metrics)
- endTime = (now / retainIntervalMs + 1) * retainIntervalMs
-
- // Removes old data
- if (queue.size() > retainCount) {
- queue.removeLast()
- }
- }
- }
-
- def read: List[HistoryMetricsItem] = {
- val result = new ListBuffer[HistoryMetricsItem]
- import scala.collection.JavaConverters._
- queue.iterator().asScala.foreach(result.prepend(_))
- result.toList
- }
-
- override def readLatest: List[HistoryMetricsItem] = {
- latest
- }
- }
-
- /**
- * Config for how long to keep history metrics data.
- *
- * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history data(unit hour)
- * @param retainHistoryDataIntervalMs time interval between two history data points.(unit: ms)
- * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS
- * recent data points(unit: seconds)
- * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS recent
- * data points(unit: ms)
- */
- case class HistoryMetricsConfig(
- retainHistoryDataHours: Int,
- retainHistoryDataIntervalMs: Int,
- retainRecentDataSeconds: Int,
- retainRecentDataIntervalMs: Int)
-
- object HistoryMetricsConfig {
- def apply(config: Config): HistoryMetricsConfig = {
- val historyHour = config.getInt(GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS)
- val historyInterval = config.getInt(GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS)
-
- val recentSeconds = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_SECONDS)
- val recentInterval = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS)
- HistoryMetricsConfig(historyHour, historyInterval, recentSeconds, recentInterval)
- }
- }
-
- class HistogramMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
- private val history = new SingleValueMetricsStore(
- config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
- config.retainHistoryDataIntervalMs)
-
- private val recent = new SingleValueMetricsStore(
- config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
- config.retainRecentDataIntervalMs)
-
- override def add(inputMetrics: MetricType): Unit = {
- recent.add(inputMetrics)
- history.add(inputMetrics)
- }
-
- override def readRecent: List[HistoryMetricsItem] = {
- recent.read
- }
-
- override def readHistory: List[HistoryMetricsItem] = {
- history.read
- }
-
- override def readLatest: List[HistoryMetricsItem] = {
- recent.readLatest
- }
- }
-
- class MeterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
- private val history = new SingleValueMetricsStore(
- config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
- config.retainHistoryDataIntervalMs)
-
- private val recent = new SingleValueMetricsStore(
- config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
- config.retainRecentDataIntervalMs)
-
- override def add(inputMetrics: MetricType): Unit = {
- recent.add(inputMetrics)
- history.add(inputMetrics)
- }
-
- override def readRecent: List[HistoryMetricsItem] = {
- recent.read
- }
-
- override def readHistory: List[HistoryMetricsItem] = {
- history.read
- }
-
- override def readLatest: List[HistoryMetricsItem] = {
- recent.readLatest
- }
- }
-
- class CounterMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
- private val history = new SingleValueMetricsStore(
- config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
- config.retainHistoryDataIntervalMs)
-
- private val recent = new SingleValueMetricsStore(
- config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
- config.retainRecentDataIntervalMs)
-
- override def add(inputMetrics: MetricType): Unit = {
- history.add(inputMetrics)
- recent.add(inputMetrics)
- }
-
- override def readRecent: List[HistoryMetricsItem] = {
- recent.read
- }
-
- override def readHistory: List[HistoryMetricsItem] = {
- history.read
- }
-
- override def readLatest: List[HistoryMetricsItem] = {
- recent.readLatest
- }
- }
-
- class GaugeMetricsStore(config: HistoryMetricsConfig) extends HistoryMetricsStore {
-
- private val compartor = (left: HistoryMetricsItem, right: HistoryMetricsItem) =>
- left.value.asInstanceOf[Gauge].value > right.value.asInstanceOf[Gauge].value
-
- private val history = new SingleValueMetricsStore(
- config.retainHistoryDataHours * 3600 * 1000 / config.retainHistoryDataIntervalMs,
- config.retainHistoryDataIntervalMs)
-
- private val recent = new SingleValueMetricsStore(
- config.retainRecentDataSeconds * 1000 / config.retainRecentDataIntervalMs,
- config.retainRecentDataIntervalMs)
-
- override def add(inputMetrics: MetricType): Unit = {
- recent.add(inputMetrics)
- history.add(inputMetrics)
- }
-
- override def readRecent: List[HistoryMetricsItem] = {
- recent.read
- }
-
- override def readHistory: List[HistoryMetricsItem] = {
- history.read
- }
-
- override def readLatest: List[HistoryMetricsItem] = {
- recent.readLatest
- }
- }
-
- class DummyMetricsAggregator extends MetricsAggregator {
- def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
- : List[HistoryMetricsItem] = {
- inputs.toList
- }
- }
-
- class SkipAllAggregator extends MetricsAggregator {
- private val empty = List.empty[HistoryMetricsItem]
- def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
- : List[HistoryMetricsItem] = {
- empty
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/LogUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/LogUtil.scala b/core/src/main/scala/io/gearpump/util/LogUtil.scala
deleted file mode 100644
index 1669129..0000000
--- a/core/src/main/scala/io/gearpump/util/LogUtil.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import java.net.InetAddress
-import java.util.Properties
-import scala.util.Try
-
-import com.typesafe.config.Config
-import org.apache.log4j.PropertyConfigurator
-import org.slf4j.{Logger, LoggerFactory}
-
-object LogUtil {
- object ProcessType extends Enumeration {
- type ProcessType = Value
- val MASTER, WORKER, LOCAL, APPLICATION, UI = Value
- }
-
- def getLogger[T](
- clazz: Class[T], context: String = null, master: Any = null, worker: Any = null,
- executor: Any = null, task: Any = null, app: Any = null, name: String = null): Logger = {
- var env = ""
-
- if (null != context) {
- env += context
- }
- if (null != master) {
- env += "master" + master
- }
- if (null != worker) {
- env += "worker" + worker
- }
-
- if (null != app) {
- env += "app" + app
- }
-
- if (null != executor) {
- env += "exec" + executor
- }
- if (null != task) {
- env += task
- }
- if (null != name) {
- env += name
- }
-
- if (!env.isEmpty) {
- LoggerFactory.getLogger(clazz.getSimpleName + "@" + env)
- } else {
- LoggerFactory.getLogger(clazz.getSimpleName)
- }
- }
-
- /** Custom the log file locations by reading config from system properties */
- def loadConfiguration(config: Config, processType: ProcessType.ProcessType): Unit = {
- // Set log file name
- val propName = s"gearpump.${processType.toString.toLowerCase}.log.file"
- val props = loadConfiguration
-
- props.setProperty("gearpump.log.file", "${" + propName + "}")
-
- props.setProperty("JVM_NAME", jvmName)
-
- processType match {
- case ProcessType.APPLICATION =>
- props.setProperty("log4j.rootAppender", "${gearpump.application.logger}")
- props.setProperty("gearpump.application.log.rootdir",
- applicationLogDir(config).getAbsolutePath)
- case _ =>
- props.setProperty("log4j.rootAppender", "${gearpump.root.logger}")
- props.setProperty("gearpump.log.dir", daemonLogDir(config).getAbsolutePath)
- }
-
- PropertyConfigurator.configure(props)
- }
-
- def daemonLogDir(config: Config): File = {
- val dir = config.getString(Constants.GEARPUMP_LOG_DAEMON_DIR)
- new File(dir)
- }
-
- def verboseLogToConsole(): Unit = {
- val props = loadConfiguration
- props.setProperty("log4j.rootLogger", "DEBUG,console")
- PropertyConfigurator.configure(props)
- }
-
- def loadConfiguration: Properties = {
- val props = new Properties()
- val log4jConfStream = getClass().getClassLoader.getResourceAsStream("log4j.properties")
- if (log4jConfStream != null) {
- props.load(log4jConfStream)
- }
- log4jConfStream.close()
- props
- }
-
- private def jvmName: String = {
- val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local")
- java.lang.management.ManagementFactory.getRuntimeMXBean().getName()
- }
-
- def applicationLogDir(config: Config): File = {
- val appLogDir = config.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR)
- new File(appLogDir)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala b/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
deleted file mode 100644
index 0b843f3..0000000
--- a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.{Closeable, Flushable}
-import scala.sys.process.ProcessLogger
-
-import org.slf4j.LoggerFactory
-
-/** Redirect the console output to parent process */
-class ProcessLogRedirector extends ProcessLogger with Closeable with Flushable with ConsoleOutput {
- private val LOG = LoggerFactory.getLogger("redirect")
-
- // We only capture the first 1K chars
- private final val LENGTH = 1000
- private var _error: String = ""
- private var _output: String = ""
-
- def error: String = _error
- def output: String = _output
-
- def out(s: => String): Unit = {
- if (_output.length <= LENGTH) {
- _output += "\n" + s
- }
- LOG.info(s)
- }
- def err(s: => String): Unit = {
- if (_error.length <= LENGTH) {
- _error += "\n" + s
- }
- LOG.error(s)
- }
- def buffer[T](f: => T): T = f
- def close(): Unit = Unit
- def flush(): Unit = Unit
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala b/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
deleted file mode 100644
index f6c7a2b..0000000
--- a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-/**
- * Check equal using reference-equal.
- */
-trait ReferenceEqual extends AnyRef {
-
- override def equals(other: Any): Boolean = {
- this.eq(other.asInstanceOf[AnyRef])
- }
-
- override def hashCode(): Int = {
- super.hashCode()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala b/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
deleted file mode 100644
index 245cb1b..0000000
--- a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.duration.Duration
-
-import akka.actor.ChildRestartStats
-
-/**
- * When one executor or task fails, Gearpump will try to start. However, if it fails after
- * multiple retries, then we abort.
- *
- * @param maxNrOfRetries The number of times is allowed to be restarted, negative value means no
- * limit, if the limit is exceeded the policy will not allow to restart
- * @param withinTimeRange Duration of the time window for maxNrOfRetries.
- * Duration.Inf means no window
- */
-class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
- private val status = new ChildRestartStats(null, 0, 0L)
- private val retriesWindow = (Some(maxNrOfRetries), Some(withinTimeRange.toMillis.toInt))
-
- def allowRestart: Boolean = {
- status.requestRestartPermission(retriesWindow)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/RichProcess.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/RichProcess.scala b/core/src/main/scala/io/gearpump/util/RichProcess.scala
deleted file mode 100644
index ab5611f..0000000
--- a/core/src/main/scala/io/gearpump/util/RichProcess.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.sys.process.Process
-
-trait ConsoleOutput {
- def output: String
- def error: String
-}
-
-/** Extends Process by providing a additional logger: ConsoleOutput interface. */
-class RichProcess(process: Process, _logger: ConsoleOutput) extends Process {
- def exitValue(): scala.Int = process.exitValue()
- def destroy(): scala.Unit = process.destroy()
- def logger: ConsoleOutput = _logger
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala b/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
deleted file mode 100644
index 64b920c..0000000
--- a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration._
-
-import akka.actor.{Actor, ActorRef}
-import akka.pattern.ask
-
-/** A helper util to send a message to remote actor and notify callback when timeout */
-trait TimeOutScheduler {
- this: Actor =>
- import context.dispatcher
-
- def sendMsgWithTimeOutCallBack(
- target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => Unit): Unit = {
- val result = target.ask(msg)(FiniteDuration(milliSeconds, TimeUnit.MILLISECONDS))
- result onSuccess {
- case msg =>
- self ! msg
- }
- result onFailure {
- case _ => timeOutHandler
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Util.scala b/core/src/main/scala/io/gearpump/util/Util.scala
deleted file mode 100644
index 8ed9bb3..0000000
--- a/core/src/main/scala/io/gearpump/util/Util.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
-import java.net.{ServerSocket, URI}
-import scala.concurrent.forkjoin.ThreadLocalRandom
-import scala.sys.process.Process
-import scala.util.{Failure, Success, Try}
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-import io.gearpump.cluster.AppJar
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.transport.HostPort
-
-object Util {
- val LOG = LogUtil.getLogger(getClass)
- private val defaultUri = new URI("file:///")
- private val appNamePattern = "^[a-zA-Z_][a-zA-Z0-9_]+$".r.pattern
-
- def validApplicationName(appName: String): Boolean = {
- appNamePattern.matcher(appName).matches()
- }
-
- def getCurrentClassPath: Array[String] = {
- val classpath = System.getProperty("java.class.path")
- val classpathList = classpath.split(File.pathSeparator)
- classpathList
- }
-
- def version: String = {
- val home = System.getProperty(Constants.GEARPUMP_HOME)
- val version = Try {
- val versionFile = new FileInputStream(new File(home, "VERSION"))
- val reader = new BufferedReader(new InputStreamReader(versionFile))
- val version = reader.readLine().replace("version:=", "")
- versionFile.close()
- version
- }
- version match {
- case Success(version) =>
- version
- case Failure(ex) =>
- LOG.error("failed to read VERSION file, " + ex.getMessage)
- "Unknown-Version"
- }
- }
-
- def startProcess(options: Array[String], classPath: Array[String], mainClass: String,
- arguments: Array[String]): RichProcess = {
- val java = System.getProperty("java.home") + "/bin/java"
- val command = List(java) ++ options ++
- List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments
- LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " +
- s"\n ${options.mkString(" ")}")
- val logger = new ProcessLogRedirector()
- val process = Process(command).run(logger)
- new RichProcess(process, logger)
- }
-
- /**
- * hostList format: host1:port1,host2:port2,host3:port3...
- */
- def parseHostList(hostList: String): List[HostPort] = {
- val masters = hostList.trim.split(",").map { address =>
- val hostAndPort = address.split(":")
- HostPort(hostAndPort(0), hostAndPort(1).toInt)
- }
- masters.toList
- }
-
- def resolvePath(path: String): String = {
- val uri = new URI(path)
- if (uri.getScheme == null && uri.getFragment == null) {
- val absolutePath = new File(path).getCanonicalPath.replaceAll("\\\\", "/")
- "file://" + absolutePath
- } else {
- path
- }
- }
-
- def isLocalPath(path: String): Boolean = {
- val uri = new URI(path)
- val scheme = uri.getScheme
- val authority = uri.getAuthority
- if (scheme == null && authority == null) {
- true
- } else if (scheme == defaultUri.getScheme) {
- true
- } else {
- false
- }
- }
-
- def randInt(): Int = {
- Math.abs(ThreadLocalRandom.current.nextInt())
- }
-
- def findFreePort(): Try[Int] = {
- Try {
- val socket = new ServerSocket(0)
- socket.setReuseAddress(true)
- val port = socket.getLocalPort()
- socket.close
- port
- }
- }
-
- def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = {
- val remotePath = jarStoreService.copyFromLocal(jarFile)
- AppJar(jarFile.getName, remotePath)
- }
-
- /**
- * This util can be used to filter out configuration from specific origin
- *
- * For example, if you want to filter out configuration from reference.conf
- * Then you can use like this:
- *
- * filterOutOrigin(config, "reference.conf")
- */
- import scala.collection.JavaConverters._
- def filterOutOrigin(config: Config, originFile: String): Config = {
- config.entrySet().asScala.foldLeft(ConfigFactory.empty()) { (config, entry) =>
- val key = entry.getKey
- val value = entry.getValue
- val origin = value.origin()
- if (origin.resource() == originFile) {
- config
- } else {
- config.withValue(key, value)
- }
- }
- }
-
- case class JvmSetting(vmargs: Array[String], classPath: Array[String])
-
- case class AppJvmSettings(appMater: JvmSetting, executor: JvmSetting)
-
- /** Get an effective AppJvmSettings from Config */
- def resolveJvmSetting(conf: Config): AppJvmSettings = {
-
- import io.gearpump.util.Constants._
-
- val appMasterVMArgs = Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+")
- .filter(_.nonEmpty)).toOption
- val executorVMArgs = Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+")
- .filter(_.nonEmpty)).toOption
-
- val appMasterClassPath = Try(
- conf.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
- .split("[;:]").filter(_.nonEmpty)).toOption
-
- val executorClassPath = Try(
- conf.getString(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
- .split(File.pathSeparator).filter(_.nonEmpty)).toOption
-
- AppJvmSettings(
- JvmSetting(appMasterVMArgs.getOrElse(Array.empty[String]),
- appMasterClassPath.getOrElse(Array.empty[String])),
- JvmSetting(executorVMArgs
- .getOrElse(Array.empty[String]), executorClassPath.getOrElse(Array.empty[String])))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala
new file mode 100644
index 0000000..871ebe1
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump
+
+/**
+ * Each message contains a immutable timestamp.
+ *
+ * For example, if you take a picture, the time you take the picture is the
+ * message's timestamp.
+ * @param msg Accept any type except Null, Nothing and Unit
+ */
+case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp)
+
+object Message {
+ val noTimeStamp: TimeStamp = 0L
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
new file mode 100644
index 0000000..91c2675
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import scala.reflect.ClassTag
+
+import akka.actor.{Actor, ActorRef, ActorSystem}
+import com.typesafe.config.{Config, ConfigFactory}
+
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.jarstore.FilePath
+
+/**
+ * This contains all information to run an application
+ *
+ * @param name The name of this application
+ * @param appMaster The class name of AppMaster Actor
+ * @param userConfig user configuration.
+ * @param clusterConfig User provided cluster config, it overrides gear.conf when starting
+ * new applications. In most cases, you should not need to change it. If you do
+ * really need to change it, please use ClusterConfigSource(filePath) to
+ * construct the object, while filePath points to the .conf file.
+ */
+case class AppDescription(
+ name: String, appMaster: String, userConfig: UserConfig,
+ clusterConfig: Config = ConfigFactory.empty())
+
+/**
+ * Each job, streaming or not streaming, need to provide an Application class.
+ * The master uses this class to start AppMaster.
+ */
+trait Application {
+
+ /** Name of this application, must be unique in the system */
+ def name: String
+
+ /** Custom user configuration */
+ def userConfig(implicit system: ActorSystem): UserConfig
+
+ /**
+ * AppMaster class, must have a constructor like this:
+ * this(appContext: AppMasterContext, app: AppDescription)
+ */
+ def appMaster: Class[_ <: ApplicationMaster]
+}
+
+object Application {
+ def apply[T <: ApplicationMaster](
+ name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = {
+ new DefaultApplication(name, userConfig,
+ tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]])
+ }
+
+ class DefaultApplication(
+ override val name: String, inputUserConfig: UserConfig,
+ val appMaster: Class[_ <: ApplicationMaster]) extends Application {
+ override def userConfig(implicit system: ActorSystem): UserConfig = inputUserConfig
+ }
+
+ def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem)
+ : AppDescription = {
+ val filterJvmReservedKeys = ClusterConfig.filterOutDefaultConfig(system.settings.config)
+ AppDescription(app.name, app.appMaster.getName, app.userConfig, filterJvmReservedKeys)
+ }
+}
+
+/**
+ * Used for verification. All AppMaster must extend this interface
+ */
+abstract class ApplicationMaster extends Actor
+
+/**
+ * This contains context information when starting an AppMaster
+ *
+ * @param appId application instance id assigned, it is unique in the cluster
+ * @param username The username who submitted this application
+ * @param resource Resouce allocated to start this AppMaster daemon. AppMaster are allowed to
+ * request more resource from Master.
+ * @param appJar application Jar. If the jar is already in classpath, then it can be None.
+ * @param masterProxy The proxy to master actor, it bridges the messages between appmaster
+ * and master
+ * @param registerData AppMaster are required to send this data to Master by when doing
+ * RegisterAppMaster.
+ */
+case class AppMasterContext(
+ appId: Int,
+ username: String,
+ resource: Resource,
+ workerInfo: WorkerInfo,
+ appJar: Option[AppJar],
+ masterProxy: ActorRef,
+ registerData: AppMasterRegisterData)
+
+/**
+ * Jar file container in the cluster
+ *
+ * @param name A meaningful name to represent this jar
+ * @param filePath Where the jar file is stored.
+ */
+case class AppJar(name: String, filePath: FilePath)
+
+/**
+ * Serves as the context to start an Executor JVM.
+ */
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorContext(
+ executorId: Int, worker: WorkerInfo, appId: Int, appName: String,
+ appMaster: ActorRef, resource: Resource)
+
+/**
+ * JVM configurations to start an Executor JVM.
+ *
+ * @param classPath When executor is created by a worker JVM, executor automatically inherits
+ * parent worker's classpath. Sometimes, you still want to add some extra
+ * classpath, you can do this by specify classPath option.
+ * @param jvmArguments java arguments like -Dxx=yy
+ * @param mainClass Executor main class name like org.apache.gearpump.xx.AppMaster
+ * @param arguments Executor command line arguments
+ * @param jar application jar
+ * @param executorAkkaConfig Akka config used to initialize the actor system of this executor.
+ * It uses org.apache.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE
+ * to pass the config to executor process
+ */
+// TODO: ExecutorContext doesn't belong to this package in logic.
+case class ExecutorJVMConfig(
+ classPath: Array[String], jvmArguments: Array[String], mainClass: String,
+ arguments: Array[String], jar: Option[AppJar], username: String,
+ executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala
new file mode 100644
index 0000000..332e770
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfig.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import java.io.File
+
+import com.typesafe.config._
+
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, FileUtils, LogUtil, Util}
+
+/**
+ *
+ * All Gearpump application should use this class to load configurations.
+ *
+ * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also
+ * resolve config from file gear.conf and geardefault.conf.
+ *
+ * Overriding order:
+ * {{{
+ * System Properties
+ * > Custom configuration file (by using system property -Dgearpump.config.file) >
+ * > gear.conf
+ * > geardefault.conf
+ * > reference.conf
+ * }}}
+ */
+
+object ClusterConfig {
+ /**
+ * alias for default
+ * default is a reserved word for java
+ */
+ def defaultConfig: Config = {
+ default(APPLICATION)
+ }
+
+ /**
+ * default application for user.
+ * Usually used when user want to start an client application.
+ */
+ def default(configFile: String = APPLICATION): Config = {
+ load(configFile).default
+ }
+
+ /**
+ * configuration for master node
+ */
+ def master(configFile: String = null): Config = {
+ load(configFile).master
+ }
+
+ /*
+ * configuration for worker node
+ */
+ def worker(configFile: String = null): Config = {
+ load(configFile).worker
+ }
+
+ /**
+ * configuration for UI server
+ */
+ def ui(configFile: String = null): Config = {
+ load(configFile).ui
+ }
+
+ /**
+ * try to load system property gearpump.config.file, or use configFile
+ */
+ private def load(configFile: String): Configs = {
+ val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE))
+ file match {
+ case Some(path) =>
+ LOG.info("loading config file " + path + "..........")
+ load(ClusterConfigSource(path))
+ case None =>
+ LOG.info("loading config file application.conf...")
+ load(ClusterConfigSource(configFile))
+ }
+ }
+
+ val APPLICATION = "application.conf"
+ val LOG = LogUtil.getLogger(getClass)
+
+ def saveConfig(conf: Config, file: File): Unit = {
+ val serialized = conf.root().render()
+ FileUtils.write(file, serialized)
+ }
+
+ def render(config: Config, concise: Boolean = false): String = {
+ if (concise) {
+ config.root().render(ConfigRenderOptions.concise().setFormatted(true))
+ } else {
+ config.root().render(ConfigRenderOptions.defaults())
+ }
+ }
+
+ /** filter JVM reserved keys and akka default reference.conf */
+ def filterOutDefaultConfig(input: Config): Config = {
+ val updated = filterOutJvmReservedKeys(input)
+ Util.filterOutOrigin(updated, "reference.conf")
+ }
+
+ private[gearpump] def load(source: ClusterConfigSource): Configs = {
+
+ val systemProperties = getSystemProperties
+
+ val user = source.getConfig
+
+ val gear = ConfigFactory.parseResourcesAnySyntax("gear.conf",
+ ConfigParseOptions.defaults.setAllowMissing(true))
+
+ val gearDefault = ConfigFactory.parseResourcesAnySyntax("geardefault.conf",
+ ConfigParseOptions.defaults.setAllowMissing(true))
+
+ val all = systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault)
+
+ val linux = all.getConfig(LINUX_CONFIG)
+
+ var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG).
+ withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG)
+
+ if (!akka.util.Helpers.isWindows) {
+
+ // Change the akka.scheduler.tick-duration to 1 ms for Linux or Mac
+ basic = linux.withFallback(basic)
+ }
+
+ val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic))
+ val worker = replaceHost(all.getConfig(WORKER_CONFIG).withFallback(basic))
+ val ui = replaceHost(all.getConfig(UI_CONFIG).withFallback(basic))
+ val app = replaceHost(basic)
+
+ new Configs(master, worker, ui, app)
+ }
+
+ private def replaceHost(config: Config): Config = {
+ val hostName = config.getString(Constants.GEARPUMP_HOSTNAME)
+ config.withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(hostName))
+ }
+
+ val JVM_RESERVED_PROPERTIES = List(
+ "os", "java", "sun", "boot", "user", "prog", "path", "line", "awt", "file"
+ )
+
+ private def getSystemProperties: Config = {
+ // Excludes default java system properties
+ JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) =>
+ config.withoutPath(property)
+ }
+ }
+
+ class ConfigValidationException(msg: String) extends Exception(msg: String)
+
+ private def filterOutJvmReservedKeys(input: Config): Config = {
+ val filterJvmReservedKeys = JVM_RESERVED_PROPERTIES.foldLeft(input) { (config, key) =>
+ config.withoutPath(key)
+ }
+ filterJvmReservedKeys
+ }
+
+ protected class Configs(
+ val master: Config, val worker: Config, val ui: Config, val default: Config)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala
new file mode 100644
index 0000000..920ceae
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterConfigSource.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import java.io.File
+import scala.language.implicitConversions
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+
+/**
+ * Data Source of ClusterConfig
+ *
+ * Please use ClusterConfigSource.apply(filePath) to construct this object
+ */
+sealed trait ClusterConfigSource extends Serializable {
+ def getConfig: Config
+}
+
+object ClusterConfigSource {
+
+ /**
+ * Construct ClusterConfigSource from resource name or file path
+ */
+ def apply(filePath: String): ClusterConfigSource = {
+
+ if (null == filePath) {
+ new ClusterConfigSourceImpl(ConfigFactory.empty())
+ } else {
+ var config = ConfigFactory.parseFileAnySyntax(new File(filePath),
+ ConfigParseOptions.defaults.setAllowMissing(true))
+
+ if (null == config || config.isEmpty) {
+ config = ConfigFactory.parseResourcesAnySyntax(filePath,
+ ConfigParseOptions.defaults.setAllowMissing(true))
+ }
+ new ClusterConfigSourceImpl(config)
+ }
+ }
+
+ implicit def FilePathToClusterConfigSource(filePath: String): ClusterConfigSource = {
+ apply(filePath)
+ }
+
+ private class ClusterConfigSourceImpl(config: Config) extends ClusterConfigSource {
+ override def getConfig: Config = config
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
new file mode 100644
index 0000000..e6ab13b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import org.apache.gearpump.cluster.worker.{WorkerSummary, WorkerId}
+
+import scala.util.Try
+
+import akka.actor.ActorRef
+import com.typesafe.config.Config
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus
+import org.apache.gearpump.cluster.master.{MasterNode, MasterSummary}
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.metrics.Metrics.MetricType
+
+object ClientToMaster {
+ case object AddMaster
+ case class AddWorker(count: Int)
+ case class RemoveMaster(masterContainerId: String)
+ case class RemoveWorker(workerContainerId: String)
+
+ /** Command result of AddMaster, RemoveMaster, and etc... */
+ case class CommandResult(success: Boolean, exception: String = null) {
+ override def toString: String = {
+ val tag = getClass.getSimpleName
+ if (success) {
+ s"$tag(success)"
+ } else {
+ s"$tag(failure, $exception)"
+ }
+ }
+ }
+
+ /** Submit an application to master */
+ case class SubmitApplication(
+ appDescription: AppDescription, appJar: Option[AppJar],
+ username: String = System.getProperty("user.name"))
+
+ case class RestartApplication(appId: Int)
+ case class ShutdownApplication(appId: Int)
+
+ /** Client send ResolveAppId to Master to resolves AppMaster actor path by providing appId */
+ case class ResolveAppId(appId: Int)
+
+ /** Client send ResolveWorkerId to master to get the Actor path of worker. */
+ case class ResolveWorkerId(workerId: WorkerId)
+
+ /** Get an active Jar store to upload job jars, like wordcount.jar */
+ case object GetJarStoreServer
+
+ /** Service address of JarStore */
+ case class JarStoreServerAddress(url: String)
+
+ /** Query AppMaster config by providing appId */
+ case class QueryAppMasterConfig(appId: Int)
+
+ /** Query worker config */
+ case class QueryWorkerConfig(workerId: WorkerId)
+
+ /** Query master config */
+ case object QueryMasterConfig
+
+ /** Options for read the metrics from the cluster */
+ object ReadOption {
+ type ReadOption = String
+
+ val Key: String = "readOption"
+
+ /** Read the latest record of the metrics, only return 1 record for one metric name (id) */
+ val ReadLatest: ReadOption = "readLatest"
+
+ /** Read recent metrics from cluster, typically it contains metrics in 5 minutes */
+ val ReadRecent = "readRecent"
+
+ /**
+ * Read the history metrics, typically it contains metrics for 48 hours
+ *
+ * NOTE: Each hour only contain one or two data points.
+ */
+ val ReadHistory = "readHistory"
+ }
+
+ /** Query history metrics from master or app master. */
+ case class QueryHistoryMetrics(
+ path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest,
+ aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String])
+
+ /**
+ * If there are message loss, the clock would pause for a while. This message is used to
+ * pin-point which task has stalling clock value, and usually it means something wrong on
+ * that machine.
+ */
+ case class GetStallingTasks(appId: Int)
+
+ /**
+ * Request app master for a short list of cluster app that administrators should be aware of.
+ */
+ case class GetLastFailure(appId: Int)
+}
+
+object MasterToClient {
+
+ /** Result of SubmitApplication */
+ // TODO: Merge with SubmitApplicationResultValue and change this to (appId: Option, ex: Exception)
+ case class SubmitApplicationResult(appId: Try[Int])
+
+ case class SubmitApplicationResultValue(appId: Int)
+
+ case class ShutdownApplicationResult(appId: Try[Int])
+ case class ReplayApplicationResult(appId: Try[Int])
+
+ /** Return Actor ref of app master */
+ case class ResolveAppIdResult(appMaster: Try[ActorRef])
+
+ /** Return Actor ref of worker */
+ case class ResolveWorkerIdResult(worker: Try[ActorRef])
+
+ case class AppMasterConfig(config: Config)
+
+ case class WorkerConfig(config: Config)
+
+ case class MasterConfig(config: Config)
+
+ case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
+
+ /**
+ * History metrics returned from master, worker, or app master.
+ *
+ * All metric items are organized like a tree, path is used to navigate through the tree.
+ * For example, when querying with path == "executor0.task1.throughput*", the metrics
+ * provider picks metrics whose source matches the path.
+ *
+ * @param path The path client provided. The returned metrics are the result query of this path.
+ * @param metrics The detailed metrics.
+ */
+ case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
+
+ /** Return the last error of this streaming application job */
+ case class LastFailure(time: TimeStamp, error: String)
+}
+
+trait AppMasterRegisterData
+
+object AppMasterToMaster {
+
+ /**
+ * Register an AppMaster by providing a ActorRef, and registerData
+ * @param registerData The registerData is provided by Master when starting the app master.
+ * App master should return the registerData back to master.
+ * Typically registerData hold some context information for this app Master.
+ */
+
+ case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData)
+
+ case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable)
+
+ case class RequestResource(appId: Int, request: ResourceRequest)
+
+ /**
+ * Each application job can save some data in the distributed cluster storage on master nodes.
+ *
+ * @param appId App Id of the client application who send the request.
+ * @param key Key name
+ * @param value Value to store on distributed cluster storage on master nodes
+ */
+ case class SaveAppData(appId: Int, key: String, value: Any)
+
+ /** The application specific data is successfully stored */
+ case object AppDataSaved
+
+ /** Fail to store the application data */
+ case object SaveAppDataFailed
+
+ /** Fetch the application specific data that stored previously */
+ case class GetAppData(appId: Int, key: String)
+
+ /** The KV data returned for query GetAppData */
+ case class GetAppDataResult(key: String, value: Any)
+
+ /**
+ * AppMasterSummary returned to REST API query. Streaming and Non-streaming
+ * have very different application info. AppMasterSummary is the common interface.
+ */
+ trait AppMasterSummary {
+ def appType: String
+ def appId: Int
+ def appName: String
+ def actorPath: String
+ def status: AppMasterStatus
+ def startTime: TimeStamp
+ def uptime: TimeStamp
+ def user: String
+ }
+
+ /** Represents a generic application that is not a streaming job */
+ case class GeneralAppMasterSummary(
+ appId: Int,
+ appType: String = "general",
+ appName: String = null,
+ actorPath: String = null,
+ status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+ startTime: TimeStamp = 0L,
+ uptime: TimeStamp = 0L,
+ user: String = null)
+ extends AppMasterSummary
+
+ /** Fetches the list of workers from Master */
+ case object GetAllWorkers
+
+ /** Get worker data of workerId */
+ case class GetWorkerData(workerId: WorkerId)
+
+ /** Response to GetWorkerData */
+ case class WorkerData(workerDescription: WorkerSummary)
+
+ /** Get Master data */
+ case object GetMasterData
+
+ /** Response to GetMasterData */
+ case class MasterData(masterDescription: MasterSummary)
+}
+
+object MasterToAppMaster {
+
+ /** Resource allocated for application xx */
+ case class ResourceAllocated(allocations: Array[ResourceAllocation])
+
+ /** Master confirm reception of RegisterAppMaster message */
+ case class AppMasterRegistered(appId: Int)
+
+ /** Shutdown the application job */
+ case object ShutdownAppMaster
+
+ type AppMasterStatus = String
+ val AppMasterActive: AppMasterStatus = "active"
+ val AppMasterInActive: AppMasterStatus = "inactive"
+ val AppMasterNonExist: AppMasterStatus = "nonexist"
+
+ sealed trait StreamingType
+ case class AppMasterData(
+ status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null,
+ workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0,
+ finishTime: TimeStamp = 0, user: String = null)
+
+ case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
+
+ case class AppMastersData(appMasters: List[AppMasterData])
+ case object AppMastersDataRequest
+ case class AppMasterDataDetailRequest(appId: Int)
+ case class AppMasterMetricsRequest(appId: Int) extends StreamingType
+
+ case class ReplayFromTimestampWindowTrailingEdge(appId: Int)
+
+ case class WorkerList(workers: List[WorkerId])
+}
+
+object AppMasterToWorker {
+ case class LaunchExecutor(
+ appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig)
+
+ case class ShutdownExecutor(appId: Int, executorId: Int, reason: String)
+ case class ChangeExecutorResource(appId: Int, executorId: Int, resource: Resource)
+}
+
+object WorkerToAppMaster {
+ case class ExecutorLaunchRejected(reason: String = null, ex: Throwable = null)
+ case class ShutdownExecutorSucceed(appId: Int, executorId: Int)
+ case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null)
+}
+