You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:43 UTC
[33/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
deleted file mode 100644
index 9f38cd7..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank
-
-import akka.actor.Actor.Receive
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
-import io.gearpump.experiments.pagerank.PageRankController.Tick
-import io.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight}
-import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
-import io.gearpump.util.Graph
-
-class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.taskId
-
- private var weight: Double = 1.0
- private var upstreamWeights = Map.empty[TaskId, Double]
-
- val taskCount = conf.getInt(PageRankApplication.COUNT).get
- lazy val allTasks = (0 until taskCount).toList.map(TaskId(processorId = 1, _))
-
- private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get
-
- private val node = graph.vertices.find { node =>
- node.taskId == taskContext.taskId.index
- }.get
-
- private val downstream = graph.outgoingEdgesOf(node).map(_._3)
- .map(id => taskId.copy(index = id.taskId)).toSeq
- private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
-
- LOG.info(s"downstream nodes: $downstream")
-
- private var tick = 0
-
- private def output(msg: AnyRef, tasks: TaskId*): Unit = {
- taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
- }
-
- override def receiveUnManagedMessage: Receive = {
- case Tick(tick) =>
- this.tick = tick
-
- if (downstream.length == 0) {
- // If there is no downstream, we will evenly distribute our page rank to
- // every node in the graph
- val update = UpdateWeight(taskId, weight / taskCount)
- output(update, allTasks: _*)
- } else {
- val update = UpdateWeight(taskId, weight / downstream.length)
- output(update, downstream: _*)
- }
- case update@UpdateWeight(upstreamTaskId, weight) =>
- upstreamWeights += upstreamTaskId -> weight
- if (upstreamWeights.size == upstreamCount) {
- val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 }
- this.upstreamWeights = Map.empty[TaskId, Double]
- this.weight = nextWeight
- output(LatestWeight(taskId, weight, tick), TaskId(0, 0))
- }
- }
-}
-
-object PageRankWorker {
- case class UpdateWeight(taskId: TaskId, weight: Double)
- case class LatestWeight(taskId: TaskId, weight: Double, tick: Int)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
deleted file mode 100644
index 3877974..0000000
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.experiments.pagerank.example
-
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.experiments.pagerank.PageRankApplication
-import io.gearpump.util.Graph.Node
-import io.gearpump.util.{AkkaApp, Graph}
-
-/** A very simple PageRank example, Cyclic graph is not supported */
-object PageRankExample extends AkkaApp {
-
- val a = "a"
- val b = "b"
- val c = "c"
- val d = "d"
-
- def help(): Unit = Unit
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
- val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b)
- val app = new PageRankApplication("pagerank", iteration = 100, delta = 0.001, pageRankGraph)
- val context = ClientContext(akkaConf)
- val appId = context.submit(app)
- context.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
new file mode 100644
index 0000000..023ee35
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankApplication.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank
+
+import akka.actor.ActorSystem
+
+import org.apache.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
+import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.appmaster.AppMaster
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph.Node
+
+/**
+ *
+ * A simple and naive pagerank implementation.
+ *
+ * @param name name of the application
+ * @param iteration max iteration count
+ * @param delta decide the accuracy when the page rank example stops.
+ * @param dag the page rank graph
+ */
+class PageRankApplication[T](
+ override val name: String, iteration: Int, delta: Double, dag: Graph[T, _])
+ extends Application {
+
+ override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
+ override def userConfig(implicit system: ActorSystem): UserConfig = {
+
+ // Map node with taskId
+ var taskId = 0
+ val pageRankDag = dag.mapVertex { node =>
+ val updatedNode = NodeWithTaskId(taskId, node)
+ taskId += 1
+ updatedNode
+ }
+
+ val taskCount = taskId
+
+ val userConfig = UserConfig.empty.withValue(PageRankApplication.DAG, pageRankDag).
+ withInt(PageRankApplication.ITERATION, iteration).
+ withInt(PageRankApplication.COUNT, taskCount).
+ withDouble(PageRankApplication.DELTA, delta)
+
+ val controller = Processor[PageRankController](1)
+ val pageRankWorker = Processor[PageRankWorker](taskCount)
+ val partitioner = new HashPartitioner
+
+ val app = StreamApplication(name, Graph(controller ~ partitioner ~> pageRankWorker), userConfig)
+ app.userConfig
+ }
+}
+
+object PageRankApplication {
+ val DAG = "PageRank.DAG"
+ val ITERATION = "PageRank.Iteration"
+ val COUNT = "PageRank.COUNT"
+ val DELTA = "PageRank.DELTA"
+ val REPORTER = "PageRank.Reporter"
+ case class NodeWithTaskId[T](taskId: Int, node: T)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
new file mode 100644
index 0000000..d461876
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank
+
+import akka.actor.Actor.Receive
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.pagerank.PageRankController.Tick
+import org.apache.gearpump.experiments.pagerank.PageRankWorker.LatestWeight
+import org.apache.gearpump.streaming.task._
+
+class PageRankController(taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
+
+ val taskCount = conf.getInt(PageRankApplication.COUNT).get
+ val iterationMax = conf.getInt(PageRankApplication.ITERATION).get
+ val delta = conf.getDouble(PageRankApplication.DELTA).get
+
+ val tasks = (0 until taskCount).toList.map(TaskId(1, _))
+
+ var tick: Int = 0
+ var receivedWeightForCurrentTick = 0
+
+ var weights = Map.empty[TaskId, Double]
+ var deltas = Map.empty[TaskId, Double]
+
+ override def onStart(startTime: StartTime): Unit = {
+ output(Tick(tick), tasks: _*)
+ }
+
+ private def output(msg: AnyRef, tasks: TaskId*): Unit = {
+ taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case LatestWeight(taskId, weight, replyTick) =>
+ if (this.tick == replyTick) {
+
+ deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
+ weights += taskId -> weight
+ receivedWeightForCurrentTick += 1
+ if (receivedWeightForCurrentTick == taskCount) {
+ this.tick += 1
+ receivedWeightForCurrentTick = 0
+ if (continueIteration) {
+ LOG.debug(s"next iteration: $tick, weight: $weights, delta: $deltas")
+ output(Tick(tick), tasks: _*)
+ } else {
+ LOG.info(s"iterations: $tick, weight: $weights, delta: $deltas")
+ }
+ }
+ }
+ }
+
+ private def continueIteration: Boolean = {
+ (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) =>
+ deltaExceed || value > delta
+ }
+ }
+}
+
+object PageRankController {
+ case class Tick(iteration: Int)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
new file mode 100644
index 0000000..e033bf1
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank
+
+import akka.actor.Actor.Receive
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
+import org.apache.gearpump.experiments.pagerank.PageRankController.Tick
+import org.apache.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
+import org.apache.gearpump.util.Graph
+
+class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ import taskContext.taskId
+
+ private var weight: Double = 1.0
+ private var upstreamWeights = Map.empty[TaskId, Double]
+
+ val taskCount = conf.getInt(PageRankApplication.COUNT).get
+ lazy val allTasks = (0 until taskCount).toList.map(TaskId(processorId = 1, _))
+
+ private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get
+
+ private val node = graph.vertices.find { node =>
+ node.taskId == taskContext.taskId.index
+ }.get
+
+ private val downstream = graph.outgoingEdgesOf(node).map(_._3)
+ .map(id => taskId.copy(index = id.taskId)).toSeq
+ private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
+
+ LOG.info(s"downstream nodes: $downstream")
+
+ private var tick = 0
+
+ private def output(msg: AnyRef, tasks: TaskId*): Unit = {
+ taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case Tick(tick) =>
+ this.tick = tick
+
+ if (downstream.length == 0) {
+ // If there is no downstream, we will evenly distribute our page rank to
+ // every node in the graph
+ val update = UpdateWeight(taskId, weight / taskCount)
+ output(update, allTasks: _*)
+ } else {
+ val update = UpdateWeight(taskId, weight / downstream.length)
+ output(update, downstream: _*)
+ }
+ case update@UpdateWeight(upstreamTaskId, weight) =>
+ upstreamWeights += upstreamTaskId -> weight
+ if (upstreamWeights.size == upstreamCount) {
+ val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 }
+ this.upstreamWeights = Map.empty[TaskId, Double]
+ this.weight = nextWeight
+ output(LatestWeight(taskId, weight, tick), TaskId(0, 0))
+ }
+ }
+}
+
+object PageRankWorker {
+ case class UpdateWeight(taskId: TaskId, weight: Double)
+ case class LatestWeight(taskId: TaskId, weight: Double, tick: Int)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala
new file mode 100644
index 0000000..9dc311f
--- /dev/null
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/example/PageRankExample.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.pagerank.example
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.experiments.pagerank.PageRankApplication
+import org.apache.gearpump.util.Graph.Node
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/** A very simple PageRank example, Cyclic graph is not supported */
+object PageRankExample extends AkkaApp {
+
+ val a = "a"
+ val b = "b"
+ val c = "c"
+ val d = "d"
+
+ def help(): Unit = Unit
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b)
+ val app = new PageRankApplication("pagerank", iteration = 100, delta = 0.001, pageRankGraph)
+ val context = ClientContext(akkaConf)
+ val appId = context.submit(app)
+ context.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
deleted file mode 100644
index b517126..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.HashPartitioner
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph.{Node => GraphNode}
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-
-/**
- * digraph flow {
- * Source_0 -> Sink_0;
- * Source_0 -> Sink_1;
- * Source_0 -> Sink_2;
- * Source_0 -> Node_1;
- * Source_1 -> Node_0;
- * Node_0 -> Sink_3;
- * Node_1 -> Sink_3;
- * Node_1 -> Sink_4;
- * Node_1 -> Node_4;
- * Node_2 -> Node_3;
- * Node_1 -> Node_3;
- * Source_0 -> Node_2;
- * Source_0 -> Node_3;
- * Node_3 -> Sink_3;
- * Node_4 -> Sink_3;
- * Source_1 -> Sink_4;
- * }
- */
-object Dag extends AkkaApp with ArgumentsParser {
- private val LOG: Logger = LogUtil.getLogger(getClass)
- val RUN_FOR_EVER = -1
-
- override val options: Array[(String, CLIOption[Any])] = Array.empty
-
- def application(config: ParseResult): StreamApplication = {
-
- val source_0 = Processor[Source_0](1)
- val source_1 = Processor[Source_1](1)
- val node_0 = Processor[Node_0](1)
- val node_1 = Processor[Node_1](1)
- val node_2 = Processor[Node_2](1)
- val node_3 = Processor[Node_3](1)
- val node_4 = Processor[Node_4](1)
- val sink_0 = Processor[Sink_0](1)
- val sink_1 = Processor[Sink_1](1)
- val sink_2 = Processor[Sink_2](1)
- val sink_3 = Processor[Sink_3](1)
- val sink_4 = Processor[Sink_4](1)
- val partitioner = new HashPartitioner
- val app = StreamApplication("dag", Graph(
- source_0 ~ partitioner ~> sink_1,
- source_0 ~ partitioner ~> sink_2,
- source_0 ~ partitioner ~> node_2,
- source_0 ~ partitioner ~> node_3,
- source_0 ~ partitioner ~> node_1,
- source_0 ~ partitioner ~> sink_0,
- node_2 ~ partitioner ~> node_3,
- node_1 ~ partitioner ~> node_3,
- node_1 ~ partitioner ~> sink_3,
- node_1 ~ partitioner ~> node_4,
- source_1 ~ partitioner ~> sink_4,
- source_1 ~ partitioner ~> node_0,
- node_3 ~ partitioner ~> sink_3,
- node_4 ~ partitioner ~> sink_3,
- node_0 ~ partitioner ~> sink_3
- ), UserConfig.empty)
- app
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val userConf = parse(args)
- val context = ClientContext(akkaConf)
- val appId = context.submit(application(userConf))
- context.close()
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
deleted file mode 100644
index dfad6c8..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.output
-
- override def onStart(startTime: StartTime): Unit = {}
-
- override def onNext(msg: Message): Unit = {
- val list = msg.msg.asInstanceOf[Vector[String]]
- output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
deleted file mode 100644
index b091a17..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import scala.collection.mutable
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
-
- var list = mutable.MutableList[String]()
-
- override def onStart(startTime: StartTime): Unit = {
- list += getClass.getCanonicalName
- }
-
- override def onNext(msg: Message): Unit = {
- val l = msg.msg.asInstanceOf[Vector[String]]
- list.size match {
- case 1 =>
- l.foreach(f => {
- list += f
- })
- case _ =>
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
deleted file mode 100644
index df656ac..0000000
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.output
-
- override def onStart(startTime: StartTime): Unit = {
- self ! Message("start")
- }
-
- override def onNext(msg: Message): Unit = {
- val list = Vector(getClass.getCanonicalName)
- output(new Message(list, System.currentTimeMillis))
- self ! Message("continue", System.currentTimeMillis())
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
new file mode 100644
index 0000000..3b6ceb8
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Dag.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.Graph.{Node => GraphNode}
+import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+
+case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+
+/**
+ * digraph flow {
+ * Source_0 -> Sink_0;
+ * Source_0 -> Sink_1;
+ * Source_0 -> Sink_2;
+ * Source_0 -> Node_1;
+ * Source_1 -> Node_0;
+ * Node_0 -> Sink_3;
+ * Node_1 -> Sink_3;
+ * Node_1 -> Sink_4;
+ * Node_1 -> Node_4;
+ * Node_2 -> Node_3;
+ * Node_1 -> Node_3;
+ * Source_0 -> Node_2;
+ * Source_0 -> Node_3;
+ * Node_3 -> Sink_3;
+ * Node_4 -> Sink_3;
+ * Source_1 -> Sink_4;
+ * }
+ */
+object Dag extends AkkaApp with ArgumentsParser {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ val RUN_FOR_EVER = -1
+
+ override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+ def application(config: ParseResult): StreamApplication = {
+
+ val source_0 = Processor[Source_0](1)
+ val source_1 = Processor[Source_1](1)
+ val node_0 = Processor[Node_0](1)
+ val node_1 = Processor[Node_1](1)
+ val node_2 = Processor[Node_2](1)
+ val node_3 = Processor[Node_3](1)
+ val node_4 = Processor[Node_4](1)
+ val sink_0 = Processor[Sink_0](1)
+ val sink_1 = Processor[Sink_1](1)
+ val sink_2 = Processor[Sink_2](1)
+ val sink_3 = Processor[Sink_3](1)
+ val sink_4 = Processor[Sink_4](1)
+ val partitioner = new HashPartitioner
+ val app = StreamApplication("dag", Graph(
+ source_0 ~ partitioner ~> sink_1,
+ source_0 ~ partitioner ~> sink_2,
+ source_0 ~ partitioner ~> node_2,
+ source_0 ~ partitioner ~> node_3,
+ source_0 ~ partitioner ~> node_1,
+ source_0 ~ partitioner ~> sink_0,
+ node_2 ~ partitioner ~> node_3,
+ node_1 ~ partitioner ~> node_3,
+ node_1 ~ partitioner ~> sink_3,
+ node_1 ~ partitioner ~> node_4,
+ source_1 ~ partitioner ~> sink_4,
+ source_1 ~ partitioner ~> node_0,
+ node_3 ~ partitioner ~> sink_3,
+ node_4 ~ partitioner ~> sink_3,
+ node_0 ~ partitioner ~> sink_3
+ ), UserConfig.empty)
+ app
+ }
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val userConf = parse(args)
+ val context = ClientContext(akkaConf)
+ val appId = context.submit(application(userConf))
+ context.close()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
new file mode 100644
index 0000000..8d163f9
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ import taskContext.output
+
+ override def onStart(startTime: StartTime): Unit = {}
+
+ override def onNext(msg: Message): Unit = {
+ val list = msg.msg.asInstanceOf[Vector[String]]
+ output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
new file mode 100644
index 0000000..8dfa565
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+import scala.collection.mutable
+
+class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+
+ var list = mutable.MutableList[String]()
+
+ override def onStart(startTime: StartTime): Unit = {
+ list += getClass.getCanonicalName
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val l = msg.msg.asInstanceOf[Vector[String]]
+ list.size match {
+ case 1 =>
+ l.foreach(f => {
+ list += f
+ })
+ case _ =>
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
new file mode 100644
index 0000000..0359519
--- /dev/null
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ import taskContext.output
+
+ override def onStart(startTime: StartTime): Unit = {
+ self ! Message("start")
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val list = Vector(getClass.getCanonicalName)
+ output(new Message(list, System.currentTimeMillis))
+ self ! Message("continue", System.currentTimeMillis())
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
deleted file mode 100644
index b142d8d..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.examples.complexdag
-
-import scala.concurrent.Future
-import scala.util.Success
-
-import org.scalatest._
-import org.scalatest.prop.PropertyChecks
-
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-
-class DagSpec extends PropSpec with PropertyChecks
- with Matchers with BeforeAndAfterAll with MasterHarness {
-
- override def beforeAll {
- startActorSystem()
- }
-
- override def afterAll {
- shutdownActorSystem()
- }
-
- protected override def config = TestUtil.DEFAULT_CONFIG
-
- property("Dag should succeed to submit application with required arguments") {
- val requiredArgs = Array.empty[String]
-
- val masterReceiver = createMockMaster()
- val args = requiredArgs
-
- Future {
- Dag.main(masterConfig, args)
- }
- masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
- masterReceiver.reply(SubmitApplicationResult(Success(0)))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
deleted file mode 100644
index 35c5824..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.complexdag
-
-import org.mockito.Mockito._
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.MockUtil._
-
-class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
-
- val context = MockUtil.mockTaskContext
-
- val node = new Node(context, UserConfig.empty)
-
- property("Node should send a Vector[String](classOf[Node].getCanonicalName, " +
- "classOf[Node].getCanonicalName") {
- val list = Vector(classOf[Node].getCanonicalName)
- val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
- node.onNext(Message(list))
- verify(context).output(argMatch[Message](_.msg == expected))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
deleted file mode 100644
index 341f6c6..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.complexdag
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.MockUtil
-
-class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
-
- val context = MockUtil.mockTaskContext
-
- val sink = new Sink(context, UserConfig.empty)
-
- property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " +
- "classOf[Sink].getCanonicalName") {
- val list = Vector(classOf[Sink].getCanonicalName)
- val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName)
- sink.onNext(Message(list))
-
- (0 until sink.list.size).map(i => {
- assert(sink.list(i).equals(expected(i)))
- })
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
deleted file mode 100644
index faa7aa7..0000000
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.complexdag
-
-import akka.actor.ActorSystem
-import org.mockito.Mockito._
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.MockUtil._
-
-class SourceSpec extends WordSpec with Matchers {
-
- "Source" should {
- "Source should send a msg of Vector[String](classOf[Source].getCanonicalName)" in {
- val system1 = ActorSystem("Source", TestUtil.DEFAULT_CONFIG)
-
- val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
-
- val context = MockUtil.mockTaskContext
-
- val source = new Source(context, UserConfig.empty)
- source.onNext(Message("start"))
-
- verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala
new file mode 100644
index 0000000..cf8ae63
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/DagSpec.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest._
+import org.scalatest.prop.PropertyChecks
+
+import scala.concurrent.Future
+import scala.util.Success
+
+class DagSpec extends PropSpec with PropertyChecks
+ with Matchers with BeforeAndAfterAll with MasterHarness {
+
+ override def beforeAll {
+ startActorSystem()
+ }
+
+ override def afterAll {
+ shutdownActorSystem()
+ }
+
+ protected override def config = TestUtil.DEFAULT_CONFIG
+
+ property("Dag should succeed to submit application with required arguments") {
+ val requiredArgs = Array.empty[String]
+
+ val masterReceiver = createMockMaster()
+ val args = requiredArgs
+
+ Future {
+ Dag.main(masterConfig, args)
+ }
+ masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
+ masterReceiver.reply(SubmitApplicationResult(Success(0)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
new file mode 100644
index 0000000..241e0f6
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.MockUtil._
+import org.mockito.Mockito._
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
+ val context = MockUtil.mockTaskContext
+
+ val node = new Node(context, UserConfig.empty)
+
+ property("Node should send a Vector[String](classOf[Node].getCanonicalName, " +
+ "classOf[Node].getCanonicalName") {
+ val list = Vector(classOf[Node].getCanonicalName)
+ val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
+ node.onNext(Message(list))
+ verify(context).output(argMatch[Message](_.msg == expected))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala
new file mode 100644
index 0000000..e7bed30
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SinkSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.complexdag
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+
+class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
+ val context = MockUtil.mockTaskContext
+
+ val sink = new Sink(context, UserConfig.empty)
+
+ property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " +
+ "classOf[Sink].getCanonicalName") {
+ val list = Vector(classOf[Sink].getCanonicalName)
+ val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName)
+ sink.onNext(Message(list))
+
+ (0 until sink.list.size).map(i => {
+ assert(sink.list(i).equals(expected(i)))
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
new file mode 100644
index 0000000..20cad1c
--- /dev/null
+++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.complexdag
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.MockUtil._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+
+class SourceSpec extends WordSpec with Matchers {
+
+ "Source" should {
+ "Source should send a msg of Vector[String](classOf[Source].getCanonicalName)" in {
+ val system1 = ActorSystem("Source", TestUtil.DEFAULT_CONFIG)
+
+ val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
+
+ val context = MockUtil.mockTaskContext
+
+ val source = new Source(context, UserConfig.empty)
+ source.onNext(Message("start"))
+
+ verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/README.md b/examples/streaming/fsio/README.md
index 06fefb6..200cdb1 100644
--- a/examples/streaming/fsio/README.md
+++ b/examples/streaming/fsio/README.md
@@ -16,7 +16,7 @@ In order to run the example:
3. Submit the application:<br>
```bash
- ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar io.gearpump.streaming.examples.sol.SOL -input $INPUT_FILE_PATH -output $OUTPUT_DIRECTORY
+ ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar org.apache.gearpump.streaming.examples.sol.SOL -input $INPUT_FILE_PATH -output $OUTPUT_DIRECTORY
```
4. Stop the application:<br>
```bash
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
deleted file mode 100644
index 3b53c9c..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
-import scala.language.implicitConversions
-
-import org.apache.hadoop.conf.Configuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.Constants._
-
-class HadoopConfig(config: UserConfig) {
-
- def withHadoopConf(conf: Configuration): UserConfig = {
- config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
- }
-
- def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
-
- private def serializeHadoopConf(conf: Configuration): Array[Byte] = {
- val out = new ByteArrayOutputStream()
- val dataOut = new DataOutputStream(out)
- conf.write(dataOut)
- dataOut.close()
- out.toByteArray
- }
-
- private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = {
- val in = new ByteArrayInputStream(bytes)
- val dataIn = new DataInputStream(in)
- val result = new Configuration()
- result.readFields(dataIn)
- dataIn.close()
- result
- }
-}
-
-object HadoopConfig {
- def empty: HadoopConfig = new HadoopConfig(UserConfig.empty)
- def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config)
-
- implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = {
- HadoopConfig(userConf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
deleted file mode 100644
index 13fc3f9..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Cancellable
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.SequenceFile._
-import org.apache.hadoop.io.{SequenceFile, Text}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.fsio.HadoopConfig._
-import io.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
- extends Task(taskContext, config) {
-
- import taskContext.taskId
-
- val outputPath = new Path(config.getString(OUTPUT_PATH).get + File.separator + taskId)
- var writer: SequenceFile.Writer = null
- val textClass = new Text().getClass
- val key = new Text()
- val value = new Text()
- val hadoopConf = config.hadoopConf
-
- private var msgCount: Long = 0
- private var snapShotKVCount: Long = 0
- private var snapShotTime: Long = 0
- private var scheduler: Cancellable = null
-
- override def onStart(startTime: StartTime): Unit = {
-
- val fs = FileSystem.get(hadoopConf)
- fs.deleteOnExit(outputPath)
- writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath),
- Writer.keyClass(textClass), Writer.valueClass(textClass))
-
- scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
- new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus())
- snapShotTime = System.currentTimeMillis()
- LOG.info("sequence file bolt initiated")
- }
-
- override def onNext(msg: Message): Unit = {
- val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
- if (kv.length >= 2) {
- key.set(kv(0))
- value.set(kv(1))
- writer.append(key, value)
- }
- msgCount += 1
- }
-
- override def onStop(): Unit = {
- if (scheduler != null) {
- scheduler.cancel()
- }
- writer.close()
- LOG.info("sequence file bolt stopped")
- }
-
- private def reportStatus() = {
- val current: Long = System.currentTimeMillis()
- LOG.info(s"Task $taskId Throughput: ${
- (msgCount - snapShotKVCount,
- (current - snapShotTime) / 1000)
- } (KVPairs, second)")
- snapShotKVCount = msgCount
- snapShotTime = current
- }
-}
-
-object SeqFileStreamProcessor {
- val OUTPUT_PATH = "outputpath"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
deleted file mode 100644
index f9b0d22..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.SequenceFile._
-import org.apache.hadoop.io.{SequenceFile, Text}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.examples.fsio.HadoopConfig._
-import io.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
- extends Task(taskContext, config) {
-
- import taskContext.output
-
- val value = new Text()
- val key = new Text()
- var reader: SequenceFile.Reader = null
- val hadoopConf = config.hadoopConf
- val fs = FileSystem.get(hadoopConf)
- val inputPath = new Path(config.getString(INPUT_PATH).get)
-
- override def onStart(startTime: StartTime): Unit = {
- reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
- self ! Start
- LOG.info("sequence file spout initiated")
- }
-
- override def onNext(msg: Message): Unit = {
- if (reader.next(key, value)) {
- output(Message(key + "++" + value))
- } else {
- reader.close()
- reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
- }
- self ! Continue
- }
-
- override def onStop(): Unit = {
- reader.close()
- }
-}
-
-object SeqFileStreamProducer {
- def INPUT_PATH: String = "inputpath"
-
- val Start = Message("start")
- val Continue = Message("continue")
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
deleted file mode 100644
index 7272f2b..0000000
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.examples.fsio
-
-import org.apache.hadoop.conf.Configuration
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.partitioner.ShufflePartitioner
-import io.gearpump.streaming.examples.fsio.HadoopConfig._
-import io.gearpump.streaming.{Processor, StreamApplication}
-import io.gearpump.util.Graph._
-import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-
-object SequenceFileIO extends AkkaApp with ArgumentsParser {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "source" -> CLIOption[Int]("<sequence file reader number>", required = false,
- defaultValue = Some(1)),
- "sink" -> CLIOption[Int]("<sequence file writer number>", required = false,
- defaultValue = Some(1)),
- "input" -> CLIOption[String]("<input file path>", required = true),
- "output" -> CLIOption[String]("<output file directory>", required = true)
- )
-
- def application(config: ParseResult): StreamApplication = {
- val spoutNum = config.getInt("source")
- val boltNum = config.getInt("sink")
- val input = config.getString("input")
- val output = config.getString("output")
- val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input)
- .withString(SeqFileStreamProcessor.OUTPUT_PATH, output)
- val hadoopConfig = appConfig.withHadoopConf(new Configuration())
- val partitioner = new ShufflePartitioner()
- val streamProducer = Processor[SeqFileStreamProducer](spoutNum)
- val streamProcessor = Processor[SeqFileStreamProcessor](boltNum)
-
- val app = StreamApplication("SequenceFileIO",
- Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig)
- app
- }
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
- val config = parse(args)
- val context = ClientContext(akkaConf)
- val appId = context.submit(application(config))
- context.close()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala
new file mode 100644
index 0000000..144dd78
--- /dev/null
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/HadoopConfig.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
+import scala.language.implicitConversions
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.util.Constants._
+
+class HadoopConfig(config: UserConfig) {
+
+ def withHadoopConf(conf: Configuration): UserConfig = {
+ config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
+ }
+
+ def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
+
+ private def serializeHadoopConf(conf: Configuration): Array[Byte] = {
+ val out = new ByteArrayOutputStream()
+ val dataOut = new DataOutputStream(out)
+ conf.write(dataOut)
+ dataOut.close()
+ out.toByteArray
+ }
+
+ private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = {
+ val in = new ByteArrayInputStream(bytes)
+ val dataIn = new DataInputStream(in)
+ val result = new Configuration()
+ result.readFields(dataIn)
+ dataIn.close()
+ result
+ }
+}
+
+object HadoopConfig {
+ def empty: HadoopConfig = new HadoopConfig(UserConfig.empty)
+ def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config)
+
+ implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = {
+ HadoopConfig(userConf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
new file mode 100644
index 0000000..2e4a556
--- /dev/null
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.Cancellable
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.SequenceFile._
+import org.apache.hadoop.io.{SequenceFile, Text}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
+import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
+ extends Task(taskContext, config) {
+
+ import taskContext.taskId
+
+ val outputPath = new Path(config.getString(OUTPUT_PATH).get + File.separator + taskId)
+ var writer: SequenceFile.Writer = null
+ val textClass = new Text().getClass
+ val key = new Text()
+ val value = new Text()
+ val hadoopConf = config.hadoopConf
+
+ private var msgCount: Long = 0
+ private var snapShotKVCount: Long = 0
+ private var snapShotTime: Long = 0
+ private var scheduler: Cancellable = null
+
+ override def onStart(startTime: StartTime): Unit = {
+
+ val fs = FileSystem.get(hadoopConf)
+ fs.deleteOnExit(outputPath)
+ writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath),
+ Writer.keyClass(textClass), Writer.valueClass(textClass))
+
+ scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+ new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus())
+ snapShotTime = System.currentTimeMillis()
+ LOG.info("sequence file bolt initiated")
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
+ if (kv.length >= 2) {
+ key.set(kv(0))
+ value.set(kv(1))
+ writer.append(key, value)
+ }
+ msgCount += 1
+ }
+
+ override def onStop(): Unit = {
+ if (scheduler != null) {
+ scheduler.cancel()
+ }
+ writer.close()
+ LOG.info("sequence file bolt stopped")
+ }
+
+ private def reportStatus() = {
+ val current: Long = System.currentTimeMillis()
+ LOG.info(s"Task $taskId Throughput: ${
+ (msgCount - snapShotKVCount,
+ (current - snapShotTime) / 1000)
+ } (KVPairs, second)")
+ snapShotKVCount = msgCount
+ snapShotTime = current
+ }
+}
+
+object SeqFileStreamProcessor {
+ val OUTPUT_PATH = "outputpath"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
new file mode 100644
index 0000000..02d2434
--- /dev/null
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.fsio
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.SequenceFile._
+import org.apache.hadoop.io.{SequenceFile, Text}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._
+import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
+ extends Task(taskContext, config) {
+
+ import taskContext.output
+
+ val value = new Text()
+ val key = new Text()
+ var reader: SequenceFile.Reader = null
+ val hadoopConf = config.hadoopConf
+ val fs = FileSystem.get(hadoopConf)
+ val inputPath = new Path(config.getString(INPUT_PATH).get)
+
+ override def onStart(startTime: StartTime): Unit = {
+ reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
+ self ! Start
+ LOG.info("sequence file spout initiated")
+ }
+
+ override def onNext(msg: Message): Unit = {
+ if (reader.next(key, value)) {
+ output(Message(key + "++" + value))
+ } else {
+ reader.close()
+ reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
+ }
+ self ! Continue
+ }
+
+ override def onStop(): Unit = {
+ reader.close()
+ }
+}
+
+object SeqFileStreamProducer {
+ def INPUT_PATH: String = "inputpath"
+
+ val Start = Message("start")
+ val Continue = Message("continue")
+}