You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:51 UTC
[41/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
index fcdbf14..5bafef1 100644
--- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,25 @@
*/
package io.gearpump.experiments.distributeservice
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestProbe}
-import io.gearpump.WorkerId
-import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster}
+import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
+
+import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterAppMaster, RequestResource}
import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered}
-import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, AppMasterRuntimeEnvironment}
-import io.gearpump.cluster.{AppMasterContext, UserConfig, AppDescription, TestUtil}
-import io.gearpump.cluster.scheduler.{ResourceAllocation, Relaxation, ResourceRequest, Resource}
-import DistServiceAppMaster.{FileContainer, GetFileContainer}
+import io.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, ResourceAllocated, WorkerList}
+import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo}
+import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppDescription, AppMasterContext, TestUtil, UserConfig}
+import io.gearpump.experiments.distributeservice.DistServiceAppMaster.{FileContainer, GetFileContainer}
import io.gearpump.util.ActorSystemBooter.RegisterActorSystem
import io.gearpump.util.ActorUtil
-import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
-
-import scala.concurrent.duration._
-class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{
+class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter {
implicit val system = ActorSystem("AppMasterSpec", TestUtil.DEFAULT_CONFIG)
val mockMaster = TestProbe()(system)
val mockWorker1 = TestProbe()(system)
@@ -45,37 +47,41 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte
val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L))
val resource = Resource(1)
val appJar = None
- val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, UserConfig.empty)
+ val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName,
+ UserConfig.empty)
"DistService AppMaster" should {
"responsable for service distributing" in {
val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", mockWorker1.ref)
- val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy, appMasterInfo)
+ val appMasterContext = AppMasterContext(appId, userName, resource, null, appJar, masterProxy,
+ appMasterInfo)
TestActorRef[DistServiceAppMaster](
AppMasterRuntimeEnvironment.props(List(masterProxy.path), appDescription, appMasterContext))
- val registerAppMaster = mockMaster.receiveOne(15 seconds)
+ val registerAppMaster = mockMaster.receiveOne(15.seconds)
assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
val appMaster = registerAppMaster.asInstanceOf[RegisterAppMaster].appMaster
mockMaster.reply(AppMasterRegistered(appId))
- //The DistributedShell AppMaster will ask for worker list
+ // The DistributedShell AppMaster will ask for worker list
mockMaster.expectMsg(GetAllWorkers)
mockMaster.reply(WorkerList(workerList))
- //After worker list is ready, DistributedShell AppMaster will request resouce on each worker
+ // After worker list is ready, DistributedShell AppMaster will request resouce on each worker
workerList.foreach { workerId =>
- mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)))
+ mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId,
+ relaxation = Relaxation.SPECIFICWORKER)))
}
- mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L)))))
+ mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref,
+ WorkerId(1, 0L)))))
mockWorker1.expectMsgClass(classOf[LaunchExecutor])
mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString))
appMaster.tell(GetFileContainer, client.ref)
- client.expectMsgClass(15 seconds, classOf[FileContainer])
+ client.expectMsgClass(15.seconds, classOf[FileContainer])
}
}
after {
- system.shutdown()
- system.awaitTermination()
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
index 2f9928c..2e37091 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankApplication.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
package io.gearpump.experiments.pagerank
import akka.actor.ActorSystem
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.appmaster.AppMaster
+
import io.gearpump.cluster.{Application, ApplicationMaster, UserConfig}
-import PageRankApplication.NodeWithTaskId
+import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.appmaster.AppMaster
+import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph
import io.gearpump.util.Graph.Node
@@ -30,21 +31,19 @@ import io.gearpump.util.Graph.Node
*
* A simple and naive pagerank implementation.
*
- * We will continue to optimize this to able to run page rank of tens of millions of nodes
- *
* @param name name of the application
* @param iteration max iteration count
* @param delta decide the accuracy when the page rank example stops.
* @param dag the page rank graph
- * @tparam T
*/
-class PageRankApplication[T] (override val name : String, iteration: Int, delta: Double, dag: Graph[T, _])
+class PageRankApplication[T](
+ override val name: String, iteration: Int, delta: Double, dag: Graph[T, _])
extends Application {
override def appMaster: Class[_ <: ApplicationMaster] = classOf[AppMaster]
override def userConfig(implicit system: ActorSystem): UserConfig = {
- // map node with taskId
+ // Map node with taskId
var taskId = 0
val pageRankDag = dag.mapVertex { node =>
val updatedNode = NodeWithTaskId(taskId, node)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
index 8771a40..0fb689d 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankController.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,14 @@
package io.gearpump.experiments.pagerank
import akka.actor.Actor.Receive
-import io.gearpump.streaming.task._
+
import io.gearpump.cluster.UserConfig
-import PageRankController.Tick
-import PageRankWorker.LatestWeight
+import io.gearpump.experiments.pagerank.PageRankController.Tick
+import io.gearpump.experiments.pagerank.PageRankWorker.LatestWeight
+import io.gearpump.streaming.task._
-class PageRankController (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class PageRankController(taskContext: TaskContext, conf: UserConfig)
+ extends Task(taskContext, conf) {
val taskCount = conf.getInt(PageRankApplication.COUNT).get
val iterationMax = conf.getInt(PageRankApplication.ITERATION).get
@@ -37,11 +39,11 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T
var weights = Map.empty[TaskId, Double]
var deltas = Map.empty[TaskId, Double]
- override def onStart(startTime : StartTime) : Unit = {
+ override def onStart(startTime: StartTime): Unit = {
output(Tick(tick), tasks: _*)
}
- private def output(msg: AnyRef, tasks: TaskId *): Unit = {
+ private def output(msg: AnyRef, tasks: TaskId*): Unit = {
taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
}
@@ -49,7 +51,7 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T
case LatestWeight(taskId, weight, replyTick) =>
if (this.tick == replyTick) {
- deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
+ deltas += taskId -> Math.abs(weight - weights.getOrElse(taskId, 0.0))
weights += taskId -> weight
receivedWeightForCurrentTick += 1
if (receivedWeightForCurrentTick == taskCount) {
@@ -66,7 +68,7 @@ class PageRankController (taskContext : TaskContext, conf: UserConfig) extends T
}
private def continueIteration: Boolean = {
- (tick < iterationMax) && deltas.values.foldLeft(false) {(deltaExceed, value) =>
+ (tick < iterationMax) && deltas.values.foldLeft(false) { (deltaExceed, value) =>
deltaExceed || value > delta
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
index 87c30c2..9f38cd7 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/PageRankWorker.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,17 @@
package io.gearpump.experiments.pagerank
import akka.actor.Actor.Receive
-import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
+
import io.gearpump.cluster.UserConfig
-import PageRankApplication.NodeWithTaskId
-import PageRankController.Tick
-import PageRankWorker.{LatestWeight, UpdateWeight}
+import io.gearpump.experiments.pagerank.PageRankApplication.NodeWithTaskId
+import io.gearpump.experiments.pagerank.PageRankController.Tick
+import io.gearpump.experiments.pagerank.PageRankWorker.{LatestWeight, UpdateWeight}
+import io.gearpump.streaming.task.{Task, TaskContext, TaskId, TaskWrapper}
import io.gearpump.util.Graph
-class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.taskId
-
private var weight: Double = 1.0
private var upstreamWeights = Map.empty[TaskId, Double]
@@ -41,14 +41,15 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
node.taskId == taskContext.taskId.index
}.get
- private val downstream = graph.outgoingEdgesOf(node).map(_._3).map(id => taskId.copy(index = id.taskId)).toSeq
+ private val downstream = graph.outgoingEdgesOf(node).map(_._3)
+ .map(id => taskId.copy(index = id.taskId)).toSeq
private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
- LOG.info(s"downstream nodes: $downstream" )
+ LOG.info(s"downstream nodes: $downstream")
private var tick = 0
- private def output(msg: AnyRef, tasks: TaskId *): Unit = {
+ private def output(msg: AnyRef, tasks: TaskId*): Unit = {
taskContext.asInstanceOf[TaskWrapper].outputUnManaged(msg, tasks: _*)
}
@@ -57,7 +58,7 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
this.tick = tick
if (downstream.length == 0) {
- // if there is no downstream, we will evenly distribute our page rank to
+ // If there is no downstream, we will evenly distribute our page rank to
// every node in the graph
val update = UpdateWeight(taskId, weight / taskCount)
output(update, allTasks: _*)
@@ -65,10 +66,10 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
val update = UpdateWeight(taskId, weight / downstream.length)
output(update, downstream: _*)
}
- case update@ UpdateWeight(upstreamTaskId, weight) =>
+ case update@UpdateWeight(upstreamTaskId, weight) =>
upstreamWeights += upstreamTaskId -> weight
if (upstreamWeights.size == upstreamCount) {
- val nextWeight = upstreamWeights.foldLeft(0.0) {(sum, item) => sum + item._2}
+ val nextWeight = upstreamWeights.foldLeft(0.0) { (sum, item) => sum + item._2 }
this.upstreamWeights = Map.empty[TaskId, Double]
this.weight = nextWeight
output(LatestWeight(taskId, weight, tick), TaskId(0, 0))
@@ -76,7 +77,7 @@ class PageRankWorker(taskContext : TaskContext, conf: UserConfig) extends Task(t
}
}
-object PageRankWorker{
+object PageRankWorker {
case class UpdateWeight(taskId: TaskId, weight: Double)
case class LatestWeight(taskId: TaskId, weight: Double, tick: Int)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
----------------------------------------------------------------------
diff --git a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
index 2fb9b81..3877974 100644
--- a/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
+++ b/examples/pagerank/src/main/scala/io/gearpump/experiments/pagerank/example/PageRankExample.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,12 @@
*/
package io.gearpump.experiments.pagerank.example
-import io.gearpump.experiments.pagerank.PageRankApplication
import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, Graph}
+import io.gearpump.experiments.pagerank.PageRankApplication
import io.gearpump.util.Graph.Node
+import io.gearpump.util.{AkkaApp, Graph}
+/** A very simple PageRank example, Cyclic graph is not supported */
object PageRankExample extends AkkaApp {
val a = "a"
@@ -29,7 +30,7 @@ object PageRankExample extends AkkaApp {
val c = "c"
val d = "d"
- def help: Unit = Unit
+ def help(): Unit = Unit
def main(akkaConf: Config, args: Array[String]): Unit = {
val pageRankGraph = Graph(a ~> b, a ~> c, a ~> d, b ~> a, b ~> d, d ~> b, d ~> c, c ~> b)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
index a7e1f92..b517126 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Dag.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,57 +18,58 @@
package io.gearpump.streaming.examples.complexdag
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.task.TaskContext
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.task.TaskContext
+import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph.{Node => GraphNode}
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
-/*
- digraph flow {
- Source_0 -> Sink_0;
- Source_0 -> Sink_1;
- Source_0 -> Sink_2;
- Source_0 -> Node_1;
- Source_1 -> Node_0;
- Node_0 -> Sink_3;
- Node_1 -> Sink_3;
- Node_1 -> Sink_4;
- Node_1 -> Node_4;
- Node_2 -> Node_3;
- Node_1 -> Node_3;
- Source_0 -> Node_2;
- Source_0 -> Node_3;
- Node_3 -> Sink_3;
- Node_4 -> Sink_3;
- Source_1 -> Sink_4;
- }
-*/
-case class Source_0(_context : TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Source_1(_context : TaskContext, _conf: UserConfig) extends Source(_context, _conf)
-case class Node_0(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_1(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_2(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_3(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Node_4(_context : TaskContext, _conf: UserConfig) extends Node(_context, _conf)
-case class Sink_0(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_1(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_2(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_3(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
-case class Sink_4(_context : TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Source_0(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Source_1(_context: TaskContext, _conf: UserConfig) extends Source(_context, _conf)
+case class Node_0(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_1(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_2(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_3(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Node_4(_context: TaskContext, _conf: UserConfig) extends Node(_context, _conf)
+case class Sink_0(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_1(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_2(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_3(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+case class Sink_4(_context: TaskContext, _conf: UserConfig) extends Sink(_context, _conf)
+/**
+ * digraph flow {
+ * Source_0 -> Sink_0;
+ * Source_0 -> Sink_1;
+ * Source_0 -> Sink_2;
+ * Source_0 -> Node_1;
+ * Source_1 -> Node_0;
+ * Node_0 -> Sink_3;
+ * Node_1 -> Sink_3;
+ * Node_1 -> Sink_4;
+ * Node_1 -> Node_4;
+ * Node_2 -> Node_3;
+ * Node_1 -> Node_3;
+ * Source_0 -> Node_2;
+ * Source_0 -> Node_3;
+ * Node_3 -> Sink_3;
+ * Node_4 -> Sink_3;
+ * Source_1 -> Sink_4;
+ * }
+ */
object Dag extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
val RUN_FOR_EVER = -1
override val options: Array[(String, CLIOption[Any])] = Array.empty
- def application(config: ParseResult) : StreamApplication = {
-
+ def application(config: ParseResult): StreamApplication = {
+
val source_0 = Processor[Source_0](1)
val source_1 = Processor[Source_1](1)
val node_0 = Processor[Node_0](1)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
index e8837ed..dfad6c8 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Node.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,16 @@
package io.gearpump.streaming.examples.complexdag
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-class Node (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime : StartTime) : Unit = {}
+ override def onStart(startTime: StartTime): Unit = {}
- override def onNext(msg : Message) : Unit = {
+ override def onNext(msg: Message): Unit = {
val list = msg.msg.asInstanceOf[Vector[String]]
output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis()))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
index adc3115..b091a17 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Sink.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,11 @@
package io.gearpump.streaming.examples.complexdag
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import scala.collection.mutable
+
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
-
-import scala.collection.mutable
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
@@ -42,5 +42,4 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext,
case _ =>
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
index 80cd8d2..df656ac 100644
--- a/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/io/gearpump/streaming/examples/complexdag/Source.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,12 @@
package io.gearpump.streaming.examples.complexdag
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
- import taskContext.{output, self}
+ import taskContext.output
override def onStart(startTime: StartTime): Unit = {
self ! Message("start")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
index 02a1017..b142d8d 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/DagSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,18 @@
package io.gearpump.streaming.examples.complexdag
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.util.Util
+import scala.concurrent.Future
+import scala.util.Success
+
import org.scalatest._
import org.scalatest.prop.PropertyChecks
-import scala.util.Success
-import scala.concurrent.Future
-class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
+
+class DagSpec extends PropSpec with PropertyChecks
+ with Matchers with BeforeAndAfterAll with MasterHarness {
override def beforeAll {
startActorSystem()
@@ -37,7 +39,7 @@ class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
shutdownActorSystem()
}
- override def config = TestUtil.DEFAULT_CONFIG
+ protected override def config = TestUtil.DEFAULT_CONFIG
property("Dag should succeed to submit application with required arguments") {
val requiredArgs = Array.empty[String]
@@ -45,7 +47,9 @@ class DagSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndA
val masterReceiver = createMockMaster()
val args = requiredArgs
- Future{Dag.main(masterConfig, args)}
+ Future {
+ Dag.main(masterConfig, args)
+ }
masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
masterReceiver.reply(SubmitApplicationResult(Success(0)))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
index d151651..35c5824 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/NodeSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,25 +17,25 @@
*/
package io.gearpump.streaming.examples.complexdag
-import io.gearpump.streaming.MockUtil
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import MockUtil._
-import org.mockito.ArgumentMatcher
-import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.MockUtil._
+
class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
val context = MockUtil.mockTaskContext
val node = new Node(context, UserConfig.empty)
- property("Node should send a Vector[String](classOf[Node].getCanonicalName, classOf[Node].getCanonicalName"){
+ property("Node should send a Vector[String](classOf[Node].getCanonicalName, " +
+ "classOf[Node].getCanonicalName") {
val list = Vector(classOf[Node].getCanonicalName)
- val expected = Vector(classOf[Node].getCanonicalName,classOf[Node].getCanonicalName)
+ val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName)
node.onNext(Message(list))
verify(context).output(argMatch[Message](_.msg == expected))
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
index b26f639..341f6c6 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SinkSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,21 @@
*/
package io.gearpump.streaming.examples.complexdag
-import io.gearpump.streaming.MockUtil
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+
class SinkSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
val context = MockUtil.mockTaskContext
val sink = new Sink(context, UserConfig.empty)
- property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName"){
+ property("Sink should send a Vector[String](classOf[Sink].getCanonicalName, " +
+ "classOf[Sink].getCanonicalName") {
val list = Vector(classOf[Sink].getCanonicalName)
val expected = Vector(classOf[Sink].getCanonicalName, classOf[Sink].getCanonicalName)
sink.onNext(Message(list))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
index 21ae6ab..faa7aa7 100644
--- a/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
+++ b/examples/streaming/complexdag/src/test/scala/io/gearpump/streaming/examples/complexdag/SourceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,14 @@
package io.gearpump.streaming.examples.complexdag
import akka.actor.ActorSystem
-import io.gearpump.streaming.MockUtil
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import MockUtil._
-import org.mockito.ArgumentMatcher
-import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{Matchers, WordSpec}
+import io.gearpump.Message
+import io.gearpump.cluster.{TestUtil, UserConfig}
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.MockUtil._
+
class SourceSpec extends WordSpec with Matchers {
"Source" should {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
index c8c1135..3b53c9c 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/HadoopConfig.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,22 @@
package io.gearpump.streaming.examples.fsio
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
+import scala.language.implicitConversions
+
+import org.apache.hadoop.conf.Configuration
import io.gearpump.cluster.UserConfig
import io.gearpump.util.Constants._
-import org.apache.hadoop.conf.Configuration
-import scala.language.implicitConversions
+class HadoopConfig(config: UserConfig) {
-class HadoopConfig(config: UserConfig) {
+ def withHadoopConf(conf: Configuration): UserConfig = {
+ config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
+ }
- def withHadoopConf(conf : Configuration) : UserConfig = config.withBytes(HADOOP_CONF, serializeHadoopConf(conf))
- def hadoopConf : Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
+ def hadoopConf: Configuration = deserializeHadoopConf(config.getBytes(HADOOP_CONF).get)
- private def serializeHadoopConf(conf: Configuration) : Array[Byte] = {
+ private def serializeHadoopConf(conf: Configuration): Array[Byte] = {
val out = new ByteArrayOutputStream()
val dataOut = new DataOutputStream(out)
conf.write(dataOut)
@@ -38,10 +41,10 @@ class HadoopConfig(config: UserConfig) {
out.toByteArray
}
- private def deserializeHadoopConf(bytes: Array[Byte]) : Configuration = {
+ private def deserializeHadoopConf(bytes: Array[Byte]): Configuration = {
val in = new ByteArrayInputStream(bytes)
val dataIn = new DataInputStream(in)
- val result= new Configuration()
+ val result = new Configuration()
result.readFields(dataIn)
dataIn.close()
result
@@ -49,8 +52,8 @@ class HadoopConfig(config: UserConfig) {
}
object HadoopConfig {
- def empty = new HadoopConfig(UserConfig.empty)
- def apply(config: UserConfig) = new HadoopConfig(config)
+ def empty: HadoopConfig = new HadoopConfig(UserConfig.empty)
+ def apply(config: UserConfig): HadoopConfig = new HadoopConfig(config)
implicit def userConfigToHadoopConfig(userConf: UserConfig): HadoopConfig = {
HadoopConfig(userConf)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
index 808f3b1..13fc3f9 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,20 +19,21 @@ package io.gearpump.streaming.examples.fsio
import java.io.File
import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
import akka.actor.Cancellable
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import SeqFileStreamProcessor._
-import HadoopConfig._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile._
import org.apache.hadoop.io.{SequenceFile, Text}
-import scala.concurrent.duration.FiniteDuration
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.examples.fsio.HadoopConfig._
+import io.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) extends Task(taskContext, config){
+class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig)
+ extends Task(taskContext, config) {
import taskContext.taskId
@@ -43,16 +44,17 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
val value = new Text()
val hadoopConf = config.hadoopConf
- private var msgCount : Long = 0
- private var snapShotKVCount : Long = 0
- private var snapShotTime : Long = 0
+ private var msgCount: Long = 0
+ private var snapShotKVCount: Long = 0
+ private var snapShotTime: Long = 0
private var scheduler: Cancellable = null
- override def onStart(startTime : StartTime) = {
+ override def onStart(startTime: StartTime): Unit = {
val fs = FileSystem.get(hadoopConf)
fs.deleteOnExit(outputPath)
- writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath), Writer.keyClass(textClass), Writer.valueClass(textClass))
+ writer = SequenceFile.createWriter(hadoopConf, Writer.file(outputPath),
+ Writer.keyClass(textClass), Writer.valueClass(textClass))
scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(5, TimeUnit.SECONDS))(reportStatus())
@@ -62,7 +64,7 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
override def onNext(msg: Message): Unit = {
val kv = msg.msg.asInstanceOf[String].split("\\+\\+")
- if(kv.length >= 2) {
+ if (kv.length >= 2) {
key.set(kv(0))
value.set(kv(1))
writer.append(key, value)
@@ -70,7 +72,7 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
msgCount += 1
}
- override def onStop(): Unit ={
+ override def onStop(): Unit = {
if (scheduler != null) {
scheduler.cancel()
}
@@ -78,14 +80,17 @@ class SeqFileStreamProcessor(taskContext : TaskContext, config: UserConfig) exte
LOG.info("sequence file bolt stopped")
}
- def reportStatus() = {
- val current : Long = System.currentTimeMillis()
- LOG.info(s"Task $taskId Throughput: ${(msgCount - snapShotKVCount, (current - snapShotTime) / 1000)} (KVPairs, second)")
+ private def reportStatus() = {
+ val current: Long = System.currentTimeMillis()
+ LOG.info(s"Task $taskId Throughput: ${
+ (msgCount - snapShotKVCount,
+ (current - snapShotTime) / 1000)
+ } (KVPairs, second)")
snapShotKVCount = msgCount
snapShotTime = current
}
}
-object SeqFileStreamProcessor{
+object SeqFileStreamProcessor {
val OUTPUT_PATH = "outputpath"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
index 5d32f74..f9b0d22 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,18 +17,20 @@
*/
package io.gearpump.streaming.examples.fsio
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import SeqFileStreamProducer._
-import HadoopConfig._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile._
import org.apache.hadoop.io.{SequenceFile, Text}
-class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) extends Task(taskContext, config){
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.examples.fsio.HadoopConfig._
+import io.gearpump.streaming.examples.fsio.SeqFileStreamProducer._
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
+ extends Task(taskContext, config) {
- import taskContext.{output, self}
+ import taskContext.output
val value = new Text()
val key = new Text()
@@ -37,14 +39,14 @@ class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) exten
val fs = FileSystem.get(hadoopConf)
val inputPath = new Path(config.getString(INPUT_PATH).get)
- override def onStart(startTime : StartTime) = {
+ override def onStart(startTime: StartTime): Unit = {
reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath))
self ! Start
LOG.info("sequence file spout initiated")
}
- override def onNext(msg: Message) = {
- if(reader.next(key, value)){
+ override def onNext(msg: Message): Unit = {
+ if (reader.next(key, value)) {
output(Message(key + "++" + value))
} else {
reader.close()
@@ -53,13 +55,13 @@ class SeqFileStreamProducer(taskContext : TaskContext, config: UserConfig) exten
self ! Continue
}
- override def onStop(): Unit ={
+ override def onStop(): Unit = {
reader.close()
}
}
-object SeqFileStreamProducer{
- def INPUT_PATH = "inputpath"
+object SeqFileStreamProducer {
+ def INPUT_PATH: String = "inputpath"
val Start = Message("start")
val Continue = Message("continue")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
index c752bba..7272f2b 100644
--- a/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
+++ b/examples/streaming/fsio/src/main/scala/io/gearpump/streaming/examples/fsio/SequenceFileIO.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,39 +17,44 @@
*/
package io.gearpump.streaming.examples.fsio
-import io.gearpump.streaming.{StreamApplication, Processor}
+import org.apache.hadoop.conf.Configuration
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.ShufflePartitioner
+import io.gearpump.streaming.examples.fsio.HadoopConfig._
+import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
-import HadoopConfig._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.apache.hadoop.conf.Configuration
-import org.slf4j.Logger
object SequenceFileIO extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
- "source"-> CLIOption[Int]("<sequence file reader number>", required = false, defaultValue = Some(1)),
- "sink"-> CLIOption[Int]("<sequence file writer number>", required = false, defaultValue = Some(1)),
- "input"-> CLIOption[String]("<input file path>", required = true),
- "output"-> CLIOption[String]("<output file directory>", required = true)
+ "source" -> CLIOption[Int]("<sequence file reader number>", required = false,
+ defaultValue = Some(1)),
+ "sink" -> CLIOption[Int]("<sequence file writer number>", required = false,
+ defaultValue = Some(1)),
+ "input" -> CLIOption[String]("<input file path>", required = true),
+ "output" -> CLIOption[String]("<output file directory>", required = true)
)
- def application(config: ParseResult) : StreamApplication = {
+ def application(config: ParseResult): StreamApplication = {
val spoutNum = config.getInt("source")
val boltNum = config.getInt("sink")
val input = config.getString("input")
val output = config.getString("output")
- val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input).withString(SeqFileStreamProcessor.OUTPUT_PATH, output)
+ val appConfig = UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, input)
+ .withString(SeqFileStreamProcessor.OUTPUT_PATH, output)
val hadoopConfig = appConfig.withHadoopConf(new Configuration())
val partitioner = new ShufflePartitioner()
val streamProducer = Processor[SeqFileStreamProducer](spoutNum)
val streamProcessor = Processor[SeqFileStreamProcessor](boltNum)
- val app = StreamApplication("SequenceFileIO", Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig)
+ val app = StreamApplication("SequenceFileIO",
+ Graph(streamProducer ~ partitioner ~> streamProcessor), hadoopConfig)
app
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
index 64ba697..e5dbe0b 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/HadoopConfigSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
*/
package io.gearpump.streaming.examples.fsio
-import io.gearpump.cluster.UserConfig
import org.apache.hadoop.conf.Configuration
import org.scalatest.{Matchers, WordSpec}
+import io.gearpump.cluster.UserConfig
+
class HadoopConfigSpec extends WordSpec with Matchers {
"HadoopConfig" should {
@@ -32,7 +33,7 @@ class HadoopConfigSpec extends WordSpec with Matchers {
val user = UserConfig.empty
- import HadoopConfig._
+ import io.gearpump.streaming.examples.fsio.HadoopConfig._
assert(user.withHadoopConf(hadoopConf).hadoopConf.get(key) == value)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
index 99c7113..bb0d26b 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,10 @@
package io.gearpump.streaming.examples.fsio
import java.io.File
+import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorSystem
import akka.testkit.TestProbe
-import io.gearpump.streaming.{Processor, MockUtil}
-import io.gearpump.streaming.task.{StartTime, TaskId}
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.streaming.Processor
-import io.gearpump.streaming.task.StartTime
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.Reader
@@ -36,8 +31,13 @@ import org.scalacheck.Gen
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-import scala.collection.mutable.ArrayBuffer
-class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+import io.gearpump.Message
+import io.gearpump.cluster.{TestUtil, UserConfig}
+import io.gearpump.streaming.task.{StartTime, TaskId}
+import io.gearpump.streaming.{MockUtil, Processor}
+class SeqFileStreamProcessorSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
+
val kvPairs = new ArrayBuffer[(String, String)]
val outputDirectory = "SeqFileStreamProcessor_Test"
val sequenceFilePath = new Path(outputDirectory + File.separator + TaskId(0, 0))
@@ -56,7 +56,8 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match
implicit val system1 = ActorSystem("SeqFileStreamProcessor", TestUtil.DEFAULT_CONFIG)
val system2 = ActorSystem("Reporter", TestUtil.DEFAULT_CONFIG)
val watcher = TestProbe()(system1)
- val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH, outputDirectory)).withHadoopConf(new Configuration())
+ val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProcessor.OUTPUT_PATH,
+ outputDirectory)).withHadoopConf(new Configuration())
val context = MockUtil.mockTaskContext
val processorDescription =
@@ -80,7 +81,7 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match
val reader = new SequenceFile.Reader(hadoopConf, Reader.file(sequenceFilePath))
kvPairs.foreach { kv =>
val (key, value) = kv
- if(value.length > 0 && reader.next(_key, _value)) {
+ if (value.length > 0 && reader.next(_key, _value)) {
assert(_key.toString == key && _value.toString == value)
}
}
@@ -90,4 +91,4 @@ class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Match
after {
fs.deleteOnExit(new Path(outputDirectory))
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
index cb6f553..04dafa7 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,26 +17,26 @@
*/
package io.gearpump.streaming.examples.fsio
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.examples.fsio.HadoopConfig
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import MockUtil._
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io.{SequenceFile, Text}
-import org.mockito.ArgumentMatcher
-import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-import scala.collection.mutable.ArrayBuffer
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.MockUtil._
+import io.gearpump.streaming.task.StartTime
+
+class SeqFileStreamProducerSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter {
-class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter{
val kvPairs = new ArrayBuffer[(String, String)]
val inputFile = "SeqFileStreamProducer_Test"
val sequenceFilePath = new Path(inputFile)
@@ -53,7 +53,8 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche
before {
fs.deleteOnExit(sequenceFilePath)
- val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath), Writer.keyClass(textClass), Writer.valueClass(textClass))
+ val writer = SequenceFile.createWriter(hadoopConf, Writer.file(sequenceFilePath),
+ Writer.keyClass(textClass), Writer.valueClass(textClass))
forAll(kvGenerator) { kv =>
_key.set(kv._1)
_value.set(kv._2)
@@ -63,9 +64,11 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche
writer.close()
}
- property("SeqFileStreamProducer should read the key-value pairs from a sequence file and deliver them") {
+ property("SeqFileStreamProducer should read the key-value pairs from " +
+ "a sequence file and deliver them") {
- val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH, inputFile)).withHadoopConf(new Configuration())
+ val conf = HadoopConfig(UserConfig.empty.withString(SeqFileStreamProducer.INPUT_PATH,
+ inputFile)).withHadoopConf(new Configuration())
val context = MockUtil.mockTaskContext
@@ -74,7 +77,8 @@ class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matche
producer.onNext(Message("start"))
val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet
- verify(context).output(argMatch[Message](msg => expected.contains(msg.msg.asInstanceOf[String])))
+ verify(context).output(argMatch[Message](msg =>
+ expected.contains(msg.msg.asInstanceOf[String])))
}
after {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
index f10227d..efb5e44 100644
--- a/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
+++ b/examples/streaming/fsio/src/test/scala/io/gearpump/streaming/examples/fsio/SequenceFileIOSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
package io.gearpump.streaming.examples.fsio
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+import com.typesafe.config.Config
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
+
import io.gearpump.cluster.ClientToMaster.SubmitApplication
import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.Util
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterAll, Matchers, PropSpec}
-import scala.util.{Success, Try}
-import scala.concurrent.Future
+class SequenceFileIOSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
-class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfterAll with MasterHarness {
override def beforeAll {
startActorSystem()
}
@@ -37,7 +40,7 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with
shutdownActorSystem()
}
- override def config = TestUtil.DEFAULT_CONFIG
+ override def config: Config = TestUtil.DEFAULT_CONFIG
property("SequenceFileIO should succeed to submit application with required arguments") {
val requiredArgs = Array(
@@ -58,7 +61,9 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with
forAll(validArgs) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
val args = requiredArgs ++ optionalArgs
- Future {SequenceFileIO.main(masterConfig, args)}
+ Future {
+ SequenceFileIO.main(masterConfig, args)
+ }
masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
masterReceiver.reply(SubmitApplicationResult(Success(0)))
}
@@ -75,5 +80,4 @@ class SequenceFileIOSpec extends PropSpec with PropertyChecks with Matchers with
assert(Try(SequenceFileIO.main(args)).isFailure, "missing required arguments, print usage")
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
index b4d9a04..35b6594 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,6 +19,8 @@
package io.gearpump.streaming.examples.kafka
import akka.actor.ActorSystem
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
@@ -29,21 +31,26 @@ import io.gearpump.streaming.sink.DataSinkProcessor
import io.gearpump.streaming.source.DataSourceProcessor
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
object KafkaReadWrite extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
- "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, defaultValue = Some(1)),
- "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false, defaultValue = Some(1)),
- "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false, defaultValue = Some("localhost:2181")),
- "brokerList" -> CLIOption[String]("<broker server list string>", required = false, defaultValue = Some("localhost:9092")),
- "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false, defaultValue = Some("topic1")),
- "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false, defaultValue = Some("topic2"))
+ "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false,
+ defaultValue = Some(1)),
+ "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false,
+ defaultValue = Some(1)),
+ "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false,
+ defaultValue = Some("localhost:2181")),
+ "brokerList" -> CLIOption[String]("<broker server list string>", required = false,
+ defaultValue = Some("localhost:9092")),
+ "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false,
+ defaultValue = Some("topic1")),
+ "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false,
+ defaultValue = Some("topic2"))
)
- def application(config: ParseResult, system: ActorSystem) : StreamApplication = {
+ def application(config: ParseResult, system: ActorSystem): StreamApplication = {
implicit val actorSystem = system
val sourceNum = config.getInt("source")
val sinkNum = config.getInt("sink")
@@ -59,7 +66,7 @@ object KafkaReadWrite extends AkkaApp with ArgumentsParser {
val sink = new KafkaSink(sinkTopic, brokerList)
val sinkProcessor = DataSinkProcessor(sink, sinkNum)
val partitioner = new ShufflePartitioner
- val computation = sourceProcessor ~ partitioner ~> sinkProcessor
+ val computation = sourceProcessor ~ partitioner ~> sinkProcessor
val app = StreamApplication("KafkaReadWrite", Graph(computation), appConfig)
app
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
index 51bc3d6..6955bcc 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,31 +19,34 @@
package io.gearpump.streaming.examples.kafka.wordcount
import akka.actor.ActorSystem
-import io.gearpump.streaming.kafka.lib.KafkaSourceConfig
-import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.kafka.{KafkaSink, KafkaStorageFactory, KafkaSource}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
+import kafka.api.OffsetRequest
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.HashPartitioner
+import io.gearpump.streaming.kafka.lib.KafkaSourceConfig
+import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory}
+import io.gearpump.streaming.sink.DataSinkProcessor
+import io.gearpump.streaming.source.DataSourceProcessor
+import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import kafka.api.OffsetRequest
-import org.slf4j.Logger
object KafkaWordCount extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
- "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false, defaultValue = Some(1)),
+ "source" -> CLIOption[Int]("<how many kafka source tasks>", required = false,
+ defaultValue = Some(1)),
"split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)),
"sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)),
- "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, defaultValue = Some(1))
- )
+ "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
+ defaultValue = Some(1))
+ )
- def application(config: ParseResult, system: ActorSystem) : StreamApplication = {
+ def application(config: ParseResult, system: ActorSystem): StreamApplication = {
implicit val actorSystem = system
val sourceNum = config.getInt("source")
val splitNum = config.getInt("split")
@@ -61,7 +64,8 @@ object KafkaWordCount extends AkkaApp with ArgumentsParser {
val sink = new KafkaSink("topic2", "localhost:9092")
val sinkProcessor = DataSinkProcessor(sink, sinkNum)
val partitioner = new HashPartitioner
- val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~> sum ~ partitioner ~> sinkProcessor
+ val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~>
+ sum ~ partitioner ~> sinkProcessor
val app = StreamApplication("KafkaWordCount", Graph(computation), appConfig)
app
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
index 499f1ac..b46d170 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Split.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,20 @@
package io.gearpump.streaming.examples.kafka.wordcount
import com.twitter.bijection.Injection
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
- override def onStart(startTime : StartTime) : Unit = {
+ override def onStart(startTime: StartTime): Unit = {
}
- override def onNext(msg : Message) : Unit = {
- Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]]).foreach(_.split("\\s+").foreach(
- word => output(new Message(word, msg.timestamp))))
+ override def onNext(msg: Message): Unit = {
+ Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]])
+ .foreach(_.split("\\s+").foreach(
+ word => output(new Message(word, msg.timestamp))))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
index 465754e..9c67733 100644
--- a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
+++ b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/Sum.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,19 @@
package io.gearpump.streaming.examples.kafka.wordcount
import com.twitter.bijection.Injection
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
import io.gearpump.Message
import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-class Sum(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output
private[wordcount] var wordcount = Map.empty[String, Long]
- override def onStart(startTime : StartTime) : Unit = {}
+ override def onStart(startTime: StartTime): Unit = {}
- override def onNext(message : Message) : Unit = {
+ override def onNext(message: Message): Unit = {
val word = message.msg.asInstanceOf[String]
val count = wordcount.getOrElse(word, 0L) + 1
wordcount += word -> count
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
index 9e79d7e..35f7a62 100644
--- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCountSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,19 @@
package io.gearpump.streaming.examples.kafka.wordcount
-import io.gearpump.cluster.ClientToMaster.SubmitApplication
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.Util
+import scala.concurrent.Future
+import scala.util.Success
+
+import com.typesafe.config.Config
import org.scalatest.prop.PropertyChecks
import org.scalatest.{BeforeAndAfter, Matchers, PropSpec}
-import scala.util.Success
-import scala.concurrent.Future
+import io.gearpump.cluster.ClientToMaster.SubmitApplication
+import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import io.gearpump.cluster.{MasterHarness, TestUtil}
-class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
+class KafkaWordCountSpec
+ extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness {
before {
startActorSystem()
@@ -38,7 +40,7 @@ class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with
shutdownActorSystem()
}
- override def config = TestUtil.DEFAULT_CONFIG
+ override def config: Config = TestUtil.DEFAULT_CONFIG
property("KafkaWordCount should succeed to submit application with required arguments") {
val requiredArgs = Array.empty[String]
@@ -58,7 +60,9 @@ class KafkaWordCountSpec extends PropSpec with PropertyChecks with Matchers with
forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) =>
val args = requiredArgs ++ optionalArgs
- Future {KafkaWordCount.main(masterConfig, args)}
+ Future {
+ KafkaWordCount.main(masterConfig, args)
+ }
masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME)
masterReceiver.reply(SubmitApplicationResult(Success(0)))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
index 864f97d..2cc6a16 100644
--- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SplitSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,14 @@
package io.gearpump.streaming.examples.kafka.wordcount
import com.twitter.bijection.Injection
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mock.MockitoSugar
-import scala.language.postfixOps
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.TaskContext
class SplitSpec extends FlatSpec with Matchers with MockitoSugar {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
index 6014d0f..4dcb9d7 100644
--- a/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
+++ b/examples/streaming/kafka/src/test/scala/io/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,24 @@
*/
package io.gearpump.streaming.examples.kafka.wordcount
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.task.StartTime
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
+import scala.collection.mutable
+
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.{FlatSpec, Matchers}
-import scala.collection.mutable
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.task.StartTime
class SumSpec extends FlatSpec with Matchers {
it should "sum should calculate the frequency of the word correctly" in {
val stringGenerator = Gen.alphaStr
- val expectedWordCountMap : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
+ val expectedWordCountMap: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
val taskContext = MockUtil.mockTaskContext
@@ -42,7 +43,7 @@ class SumSpec extends FlatSpec with Matchers {
val str = "once two two three three three"
var totalWordCount = 0
- stringGenerator.map {word =>
+ stringGenerator.map { word =>
totalWordCount += 1
expectedWordCountMap.put(word, expectedWordCountMap.getOrElse(word, 0L) + 1)
sum.onNext(Message(word))
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/README.md b/examples/streaming/sol/README.md
index e772af4..a8b10b3 100644
--- a/examples/streaming/sol/README.md
+++ b/examples/streaming/sol/README.md
@@ -1,6 +1,6 @@
SOL is a throughput test. It will create multiple layers, and then do random shuffling between these layers.
-SOLPRoducer -> SOLProcessor -> SOLProcessor -> ...
+SOLProducer -> SOLProcessor -> SOLProcessor -> ...
The original code comes from: https://github.com/yahoo/storm-perf-test
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
index b531250..10c190c 100644
--- a/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
+++ b/examples/streaming/sol/src/main/scala/io/gearpump/streaming/examples/sol/SOL.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,25 +18,30 @@
package io.gearpump.streaming.examples.sol
-import io.gearpump.streaming.{StreamApplication, Processor}
+import org.slf4j.Logger
+
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.ShufflePartitioner
+import io.gearpump.streaming.{Processor, StreamApplication}
import io.gearpump.util.Graph._
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
-import org.slf4j.Logger
object SOL extends AkkaApp with ArgumentsParser {
private val LOG: Logger = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
- "streamProducer"-> CLIOption[Int]("<stream producer number>", required = false, defaultValue = Some(1)),
- "streamProcessor"-> CLIOption[Int]("<stream processor number>", required = false, defaultValue = Some(1)),
- "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false, defaultValue = Some(100)),
- "stages"-> CLIOption[Int]("<how many stages to run>", required = false, defaultValue = Some(2)))
+ "streamProducer" -> CLIOption[Int]("<stream producer number>", required = false,
+ defaultValue = Some(1)),
+ "streamProcessor" -> CLIOption[Int]("<stream processor number>", required = false,
+ defaultValue = Some(1)),
+ "bytesPerMessage" -> CLIOption[Int]("<size of each message>", required = false,
+ defaultValue = Some(100)),
+ "stages" -> CLIOption[Int]("<how many stages to run>", required = false,
+ defaultValue = Some(2)))
- def application(config: ParseResult) : StreamApplication = {
+ def application(config: ParseResult): StreamApplication = {
val spoutNum = config.getInt("streamProducer")
val boltNum = config.getInt("streamProcessor")
val bytesPerMessage = config.getInt("bytesPerMessage")