You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2017/01/20 08:47:42 UTC
[02/19] incubator-gearpump git commit: merge master into akka-streams
branch
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 3003b98..7f3c250 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gearpump.streaming.dsl.javaapi
-import scala.collection.JavaConverters._
import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
import org.apache.gearpump.streaming.dsl.window.api.Window
-import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
-import org.apache.gearpump.streaming.javaapi.dsl.functions._
import org.apache.gearpump.streaming.task.Task
/**
@@ -31,23 +31,23 @@ import org.apache.gearpump.streaming.task.Task
class JavaStream[T](val stream: Stream[T]) {
/** FlatMap on stream */
- def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
- new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+ def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
}
/** Map on stream */
def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
- new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
}
/** Only keep the messages that FilterFunction returns true. */
def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
- new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+ new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
}
/** Does aggregation on the stream */
def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
- new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description))
+ new JavaStream[T](stream.reduce(fn, description))
}
def log(): Unit = {
@@ -65,7 +65,7 @@ class JavaStream[T](val stream: Stream[T]) {
*/
def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
parallelism: Int, description: String): JavaStream[T] = {
- new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+ new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
}
def window(win: Window, description: String): JavaWindowStream[T] = {
@@ -84,6 +84,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) {
def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
description: String): JavaStream[T] = {
- new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+ new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index 0d841be..f5b2910 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -19,13 +19,14 @@
package org.apache.gearpump.streaming.dsl.javaapi
import java.util.Collection
-import scala.collection.JavaConverters._
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication}
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp}
import org.apache.gearpump.streaming.source.DataSource
+import scala.collection.JavaConverters._
+
class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
private val streamApp = StreamApp(name, context, userConfig)
@@ -41,7 +42,7 @@ class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig
new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
}
- def run(): Unit = {
+ def submit(): RunningApplication = {
context.submit(streamApp)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..85d597d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Java version of FlatMapFunction returns a java.util.Iterator.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): java.util.Iterator[R]
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
new file mode 100644
index 0000000..7656cba
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
+
+/**
+ * Assigns the input value into a group.
+ *
+ * @param T Input value type
+ * @param GROUP Group value type
+ */
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
index 2ec881b..efa7409 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -19,7 +19,7 @@
package org.apache.gearpump.streaming.dsl.partitioner
import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
/**
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index b2c5506..5aaf2fa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -22,11 +22,10 @@ import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction}
import org.apache.gearpump.streaming.{Constants, Processor}
import org.apache.gearpump.streaming.dsl.task.TransformTask
-import org.apache.gearpump.streaming.dsl.window.api.{CountWindow, GroupByFn}
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
import org.apache.gearpump.streaming.task.Task
@@ -130,12 +129,11 @@ case class ChainableOp[IN, OUT](
override def description: String = fn.description
-
override def chain(other: Op)(implicit system: ActorSystem): Op = {
other match {
case op: ChainableOp[OUT, _] =>
// TODO: preserve type info
- ChainableOp(fn.andThen(op.fn))
+ ChainableOp(AndThen(fn, op.fn))
case _ =>
throw new OpChainException(this, other)
}
@@ -147,15 +145,6 @@ case class ChainableOp[IN, OUT](
}
}
-object GroupByOp {
-
- def apply[IN, GROUP](groupBy: IN => GROUP, parallelism: Int,
- description: String, userConfig: UserConfig): Op = {
- GroupByOp(GroupAlsoByWindow(groupBy, CountWindow.apply(1).accumulating), parallelism,
- description, userConfig)
- }
-}
-
/**
* This represents a Processor with window aggregation
*/
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 16d5c06..65f9cd2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.plan
import akka.actor.ActorSystem
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.task.Task
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 609fbb0..687fd2e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -17,20 +17,37 @@
*/
package org.apache.gearpump.streaming.dsl.plan.functions
-trait SingleInputFunction[IN, OUT] extends Serializable {
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+
+/**
+ * Internal function to process single input
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
def process(value: IN): TraversableOnce[OUT]
- def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- new AndThen(this, other)
- }
+
def finish(): TraversableOnce[OUT] = None
- def clearState(): Unit = {}
+
+ def teardown(): Unit = {}
+
def description: String
}
-class AndThen[IN, MIDDLE, OUT](
- first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE],
+ second: SingleInputFunction[MIDDLE, OUT])
extends SingleInputFunction[IN, OUT] {
+ override def setup(): Unit = {
+ first.setup()
+ second.setup()
+ }
+
override def process(value: IN): TraversableOnce[OUT] = {
first.process(value).flatMap(second.process)
}
@@ -44,9 +61,9 @@ class AndThen[IN, MIDDLE, OUT](
}
}
- override def clearState(): Unit = {
- first.clearState()
- second.clearState()
+ override def teardown(): Unit = {
+ first.teardown()
+ second.teardown()
}
override def description: String = {
@@ -56,22 +73,31 @@ class AndThen[IN, MIDDLE, OUT](
}
}
-class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
extends SingleInputFunction[IN, OUT] {
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
override def process(value: IN): TraversableOnce[OUT] = {
fn(value)
}
- override def description: String = descriptionMessage
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
}
-
-class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+class Reducer[T](fn: ReduceFunction[T], val description: String)
extends SingleInputFunction[T, T] {
private var state: Option[T] = None
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
override def process(value: T): TraversableOnce[T] = {
if (state.isEmpty) {
state = Option(value)
@@ -85,23 +111,18 @@ class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
state
}
- override def clearState(): Unit = {
+ override def teardown(): Unit = {
state = None
+ fn.teardown()
}
-
- override def description: String = descriptionMessage
}
-class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
override def process(value: T): TraversableOnce[Unit] = {
emit(value)
None
}
- override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
- throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
- }
-
override def description: String = ""
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
new file mode 100644
index 0000000..430d795
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions._
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow}
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+ private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+ private val edge: Option[OpEdge] = None) {
+
+ /**
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
+ *
+ * @param fn flatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
+ *
+ * @param fn flatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+ transform(new FlatMapper[T, R](fn, description))
+ }
+
+ /**
+ * Returns a new stream by applying a map function to each element.
+ *
+ * @param fn map function
+ * @return A new stream with type [R]
+ */
+ def map[R](fn: T => R, description: String = "map"): Stream[R] = {
+ this.map(MapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a map function to each element.
+ *
+ * @param fn map function
+ * @return A new stream with type [R]
+ */
+ def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new Stream keeping the elements that satisfy the filter function.
+ *
+ * @param fn filter function
+ * @return a new stream after filter
+ */
+ def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
+ this.filter(FilterFunction(fn), description)
+ }
+
+ /**
+ * Returns a new Stream keeping the elements that satisfy the filter function.
+ *
+ * @param fn filter function
+ * @return a new stream after filter
+ */
+ def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
+ * @param description description message for this operator
+ * @return a new stream after reduce
+ */
+ def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+ reduce(ReduceFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
+ * @param description description message for this operator
+ * @return a new stream after reduce
+ */
+ def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+ transform(new Reducer[T](fn, description))
+ }
+
+ private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = {
+ val op = ChainableOp(fn)
+ graph.addVertex(op)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+ new Stream(graph, op)
+ }
+
+ /**
+ * Log to task log file
+ */
+ def log(): Unit = {
+ this.map(msg => {
+ LoggerFactory.getLogger("dsl").info(msg.toString)
+ msg
+ }, "log")
+ }
+
+ /**
+ * Merges data from two stream into one
+ *
+ * @param other the other stream
+ * @return the merged stream
+ */
+ def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+ val mergeOp = MergeOp(description, UserConfig.empty)
+ graph.addVertex(mergeOp)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+ graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+ new Stream[T](graph, mergeOp)
+ }
+
+ /**
+ * Group by function (T => Group)
+ *
+ * For example, we have T type, People(name: String, gender: String, age: Int)
+ * groupBy[People](_.gender) will group the people by gender.
+ *
+ * You can append other combinators after groupBy
+ *
+ * For example,
+ * {{{
+ * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
+ * }}}
+ *
+ * @param fn Group by function
+ * @param parallelism Parallelism level
+ * @param description The description
+ * @return the grouped stream
+ */
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ window(CountWindow.apply(1).accumulating)
+ .groupBy[GROUP](fn, parallelism, description)
+ }
+
+ /**
+ * Window function
+ *
+ * @param win window definition
+ * @param description window description
+ * @return [[WindowStream]] where groupBy could be applied
+ */
+ def window(win: Window, description: String = "window"): WindowStream[T] = {
+ new WindowStream[T](graph, edge, thisNode, win, description)
+ }
+
+ /**
+ * Connects with a low level Processor(TaskDescription)
+ *
+ * @param processor a user defined processor
+ * @param parallelism parallelism level
+ * @return new stream after processing with type [R]
+ */
+ def process[R](
+ processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
+ description: String = "process"): Stream[R] = {
+ val processorOp = ProcessorOp(processor, parallelism, conf, description)
+ graph.addVertex(processorOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+ new Stream[R](graph, processorOp, Some(Shuffle))
+ }
+}
+
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
+ window: Window, winDesc: String) {
+
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
+ val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+ s"$winDesc.$description")
+ graph.addVertex(groupOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+ new Stream[T](graph, groupOp)
+ }
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+ /**
+ * GroupBy key
+ *
+ * Applies to Stream[Tuple2[K,V]]
+ *
+ * @param parallelism the parallelism for this operation
+ * @return the new KV stream
+ */
+ def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+ stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+ }
+
+ /**
+ * Sum the value of the tuples
+ *
+ * Apply to Stream[Tuple2[K,V]], V must be of type Number
+ *
+ * For input (key, value1), (key, value2), will generate (key, value1 + value2)
+ * @param numeric the numeric operations
+ * @return the sum stream
+ */
+ def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+ stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
+ }
+}
+
+object Stream {
+
+ def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
+ new Stream[T](graph, node, edge)
+ }
+
+ def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+ def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
+ = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+ implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
+ new KVStream(stream)
+ }
+
+ implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+ def sink(dataSink: DataSink, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+ implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
+ stream.graph.addVertex(sink)
+ stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+ new Stream[T](stream.graph, sink)
+ }
+ }
+}
+
+class LoggerSink[T] extends DataSink {
+ var logger: Logger = _
+
+ override def open(context: TaskContext): Unit = {
+ this.logger = context.logger
+ }
+
+ override def write(message: Message): Unit = {
+ logger.info("logging message " + message.msg)
+ }
+
+ override def close(): Unit = Unit
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
new file mode 100644
index 0000000..d6eed2e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.Graph
+
+import scala.language.implicitConversions
+
+/**
+ * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
+ *
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
+ *
+ * @param name name of app
+ */
+class StreamApp(
+ name: String, system: ActorSystem, userConfig: UserConfig,
+ private val graph: Graph[Op, OpEdge]) {
+
+ def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
+ this(name, system, userConfig, Graph.empty[Op, OpEdge])
+ }
+
+ def plan(): StreamApplication = {
+ implicit val actorSystem = system
+ val planner = new Planner
+ val dag = planner.plan(graph)
+ StreamApplication(name, dag, userConfig)
+ }
+}
+
+object StreamApp {
+ def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty)
+ : StreamApp = {
+ new StreamApp(name, context.system, userConfig)
+ }
+
+ implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
+ streamApp.plan()
+ }
+
+ implicit class Source(app: StreamApp) extends java.io.Serializable {
+
+ def source[T](dataSource: DataSource, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
+ implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
+ app.graph.addVertex(sourceOp)
+ new Stream[T](app.graph, sourceOp)
+ }
+
+ def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+ this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
+ }
+ }
+}
+
+/** A test message source which generated message sequence repeatedly. */
+class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
+ private lazy val iterator: Iterator[T] = seq.iterator
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+ override def read(): Message = {
+ if (iterator.hasNext) {
+ Message(iterator.next(), Instant.now().toEpochMilli)
+ } else {
+ null
+ }
+ }
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = Instant.now()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..f10a3db
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
+
+import scala.collection.JavaConverters._
+
+object FlatMapFunction {
+
+ def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[R] = {
+ fn.apply(t).asScala
+ }
+
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+
+ def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+ override def apply(t: T): TraversableOnce[R] = {
+ fn(t)
+ }
+ }
+ }
+
+ def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[R] = {
+ Option(fn(t))
+ }
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+
+ def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = {
+ new FlatMapFunction[T, T] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[T] = {
+ if (fn(t)) {
+ Option(t)
+ } else {
+ None
+ }
+ }
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+}
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Scala version of FlatMapFunction returns a TraversableOnce.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): TraversableOnce[R]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
new file mode 100644
index 0000000..ab88bf1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
+ def teardown(): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
index 4ee2fa8..06f2964 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
import java.time.Instant
-import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
index 4b7649f..0674339 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
import java.time.Instant
-import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
index 980a54b..78ba762 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -21,7 +21,6 @@ import java.time.Instant
import java.util.concurrent.TimeUnit
import akka.actor.Actor.Receive
-import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index e35f085..f8fbefa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -17,21 +17,26 @@
*/
package org.apache.gearpump.streaming.dsl.task
+import java.time.Instant
+
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
import org.apache.gearpump.streaming.task.{Task, TaskContext}
-class TransformTask[IN, OUT](
- operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
- userConf: UserConfig) extends Task(taskContext, userConf) {
+class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
+ taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
def this(taskContext: TaskContext, userConf: UserConfig) = {
this(userConf.getValue[SingleInputFunction[IN, OUT]](
GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
}
+ override def onStart(startTime: Instant): Unit = {
+ operator.foreach(_.setup())
+ }
+
override def onNext(msg: Message): Unit = {
val time = msg.timestamp
@@ -44,4 +49,8 @@ class TransformTask[IN, OUT](
taskContext.output(new Message(msg.msg, time))
}
}
+
+ override def onStop(): Unit = {
+ operator.foreach(_.teardown())
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 9af5e61..223a4af 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -22,12 +22,13 @@ import java.time.Instant
import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.gs.collections.api.block.procedure.Procedure
-import org.apache.gearpump.gs.collections.impl.list.mutable.FastList
-import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap
-import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import com.gs.collections.api.block.procedure.Procedure
+import com.gs.collections.impl.list.mutable.FastList
+import com.gs.collections.impl.map.mutable.UnifiedMap
+import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import com.gs.collections.impl.set.mutable.UnifiedSet
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction}
import org.apache.gearpump.streaming.dsl.window.api.Discarding
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.LogUtil
@@ -38,7 +39,6 @@ trait WindowRunner {
def process(message: Message): Unit
def trigger(time: Instant): Unit
-
}
object DefaultWindowRunner {
@@ -46,18 +46,6 @@ object DefaultWindowRunner {
private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
- extends Comparable[WindowGroup[GROUP]] {
- override def compareTo(o: WindowGroup[GROUP]): Int = {
- val ret = bucket.compareTo(o.bucket)
- if (ret != 0) {
- ret
- } else if (group.equals(o.group)) {
- 0
- } else {
- -1
- }
- }
- }
}
class DefaultWindowRunner[IN, GROUP, OUT](
@@ -66,20 +54,27 @@ class DefaultWindowRunner[IN, GROUP, OUT](
extends WindowRunner {
import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
- private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]]
+ private val windows = new TreeSortedMap[Bucket, UnifiedSet[WindowGroup[GROUP]]]
+ private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
-
override def process(message: Message): Unit = {
val (group, buckets) = groupBy.groupBy(message)
buckets.foreach { bucket =>
val wg = WindowGroup(bucket, group)
+ val wgs = windows.getOrDefault(bucket, new UnifiedSet[WindowGroup[GROUP]](1))
+ wgs.add(wg)
+ windows.put(bucket, wgs)
+
val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
inputs.add(message.msg.asInstanceOf[IN])
windowGroups.put(wg, inputs)
}
- groupFns.putIfAbsent(group,
- userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+ if (!groupFns.containsKey(group)) {
+ val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
+ fn.setup()
+ groupFns.put(group, fn)
+ }
}
override def trigger(time: Instant): Unit = {
@@ -87,21 +82,28 @@ class DefaultWindowRunner[IN, GROUP, OUT](
@annotation.tailrec
def onTrigger(): Unit = {
- if (windowGroups.notEmpty()) {
- val first = windowGroups.firstKey
- if (!time.isBefore(first.bucket.endTime)) {
- val inputs = windowGroups.remove(first)
- val reduceFn = groupFns.get(first.group)
- .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
- inputs.forEach(new Procedure[IN] {
- override def value(t: IN): Unit = {
- reduceFn.process(t)
+ if (windows.notEmpty()) {
+ val first = windows.firstKey
+ if (!time.isBefore(first.endTime)) {
+ val wgs = windows.remove(first)
+ wgs.forEach(new Procedure[WindowGroup[GROUP]] {
+ override def value(each: WindowGroup[GROUP]): Unit = {
+ val inputs = windowGroups.remove(each)
+ val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time)))
+ inputs.forEach(new Procedure[IN] {
+ override def value(t: IN): Unit = {
+ // .toList forces eager evaluation
+ reduceFn.process(t).toList
+ }
+ })
+ // .toList forces eager evaluation
+ reduceFn.finish().toList
+ if (groupBy.window.accumulationMode == Discarding) {
+ reduceFn.teardown()
+ }
}
})
- reduceFn.finish()
- if (groupBy.window.accumulationMode == Discarding) {
- reduceFn.clearState()
- }
+
onTrigger()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
index d045def..8f8b7ab 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -20,12 +20,11 @@ package org.apache.gearpump.streaming.metrics
import java.util
+import com.google.common.collect.Iterators
import com.typesafe.config.Config
-
import org.apache.gearpump.TimeStamp
import org.apache.gearpump.cluster.ClientToMaster.ReadOption
import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import org.apache.gearpump.google.common.collect.Iterators
import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
import org.apache.gearpump.metrics.MetricsAggregator
import org.apache.gearpump.streaming.metrics.ProcessorAggregator._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
new file mode 100644
index 0000000..9b63e04
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/** Used by storm module to broadcast message to all downstream tasks */
+class BroadcastPartitioner extends MulticastPartitioner {
+ private var lastPartitionNum = -1
+ private var partitions = Array.empty[Int]
+
+ override def getPartitions(
+ msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+ if (partitionNum != lastPartitionNum) {
+ partitions = (0 until partitionNum).toArray
+ lastPartitionNum = partitionNum
+ }
+ partitions
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
new file mode 100644
index 0000000..4cb1bad
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Will have the same parallelism with last processor
+ * And each task in current processor will co-locate with task of last processor
+ */
+class CoLocationPartitioner extends UnicastPartitioner {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ currentPartitionId
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
new file mode 100644
index 0000000..6137705
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Only make sense when the message has implemented the hashCode()
+ * Otherwise, it will use Object.hashCode(), which will not return
+ * same hash code after serialization and deserialization.
+ */
+class HashPartitioner extends UnicastPartitioner {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
new file mode 100644
index 0000000..f685cc9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.commons.lang.SerializationUtils
+import org.apache.gearpump.Message
+
+import scala.reflect.ClassTag
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
+ * of upstream processor A send to several tasks of downstream processor B.
+ */
+sealed trait Partitioner extends Serializable
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
+ * ONE-task {@literal ->} ONE-task mapping.
+ */
+trait UnicastPartitioner extends Partitioner {
+
+ /**
+ * Gets the SINGLE downstream processor task index to send message to.
+ *
+ * @param msg Message you want to send
+ * @param partitionNum How many tasks does the downstream processor have.
+ * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
+ *
+ * @return ONE task index of downstream processor.
+ */
+ def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
+
+ def getPartition(msg: Message, partitionNum: Int): Int = {
+ getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+ }
+}
+
+trait MulticastPartitioner extends Partitioner {
+
+ /**
+ * Gets a list of downstream processor task indexes to send message to.
+ *
+ * @param upstreamTaskIndex Current sender task's task index.
+ *
+ */
+ def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
+
+ def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
+ getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+ }
+}
+
+sealed trait PartitionerFactory {
+
+ def name: String
+
+ def partitioner: Partitioner
+}
+
+/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
+class PartitionerObject(private[this] val _partitioner: Partitioner)
+ extends PartitionerFactory with Serializable {
+
+ override def name: String = partitioner.getClass.getName
+
+ override def partitioner: Partitioner = {
+ SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+ }
+}
+
+/** Store the partitioner in class Name, the user need to instantiate a new class */
+class PartitionerByClassName(partitionerClass: String)
+ extends PartitionerFactory with Serializable {
+
+ override def name: String = partitionerClass
+ override def partitioner: Partitioner = {
+ Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+ }
+}
+
+/**
+ * @param partitionerFactory How we construct a Partitioner.
+ */
+case class PartitionerDescription(partitionerFactory: PartitionerFactory)
+
+object Partitioner {
+ val UNKNOWN_PARTITION_ID = -1
+
+ def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
+ PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
new file mode 100644
index 0000000..1b223e0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+import scala.util.Random
+
+/**
+ * The idea of ShuffleGroupingPartitioner is derived from Storm.
+ * Messages are randomly distributed across the downstream's tasks in a way such that
+ * each task is guaranteed to get an equal number of messages.
+ */
+class ShuffleGroupingPartitioner extends UnicastPartitioner {
+ private val random = new Random
+ private var index = -1
+ private var partitions = List.empty[Int]
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ index += 1
+ if (partitions.isEmpty) {
+ partitions = 0.until(partitionNum).toList
+ partitions = random.shuffle(partitions)
+ } else if (index >= partitionNum) {
+ index = 0
+ partitions = random.shuffle(partitions)
+ }
+ partitions(index)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
new file mode 100644
index 0000000..39d5e3b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import java.util.Random
+
+import org.apache.gearpump.Message
+
+/**
+ * Round Robin partition the data to downstream processor tasks.
+ */
+class ShufflePartitioner extends UnicastPartitioner {
+ private var seed = 0
+ private var count = 0
+
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+
+ if (seed == 0) {
+ seed = newSeed()
+ }
+
+ val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
+ count = count + 1
+ result
+ }
+
+ private def newSeed(): Int = new Random().nextInt()
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 535497c..450f2d6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -69,6 +69,7 @@ class DataSourceTask[IN, OUT] private[source](
override def onStart(startTime: Instant): Unit = {
LOG.info(s"opening data source at $startTime")
source.open(context, startTime)
+ operator.foreach(_.setup())
self ! Watermark(source.getWatermark)
}
@@ -82,6 +83,7 @@ class DataSourceTask[IN, OUT] private[source](
}
override def onStop(): Unit = {
+ operator.foreach(_.teardown())
LOG.info("closing data source...")
source.close()
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
index 902c663..3b32163 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.task
-import org.apache.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap}
+import com.esotericsoftware.kryo.util.{IntMap, ObjectMap}
import org.apache.gearpump.streaming.task.SerializerResolver.Registration
private[task] class SerializerResolver {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
index 692d7f9..5c99980 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming.task
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
import org.apache.gearpump.streaming.{DAG, LifeTime}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index d9fbc82..4193fbf 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.task
import org.slf4j.Logger
-import org.apache.gearpump.google.common.primitives.Shorts
-import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
+import com.google.common.primitives.Shorts
+import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
import org.apache.gearpump.streaming.LifeTime
import org.apache.gearpump.streaming.task.Subscription._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index f72e5b8..92f6672 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -23,17 +23,17 @@ import java.util
import java.util.concurrent.TimeUnit
import akka.actor._
+import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap
+import org.apache.gearpump.streaming.source.Watermark
+import org.slf4j.Logger
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
import org.apache.gearpump.metrics.Metrics
import org.apache.gearpump.serializer.SerializationFramework
import org.apache.gearpump.streaming.AppMasterToExecutor._
import org.apache.gearpump.streaming.ExecutorToAppMaster._
import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
import org.apache.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
/**
*
@@ -52,9 +52,10 @@ class TaskActor(
def serializerPool: SerializationFramework = inputSerializerPool
+ import taskContextData._
+
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.task.TaskActor._
- import taskContextData._
val config = context.system.settings.config
val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
@@ -75,9 +76,9 @@ class TaskActor(
private var life = taskContextData.life
// Latency probe
- import context.dispatcher
-
import scala.concurrent.duration._
+
+ import context.dispatcher
final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
// Clock report interval
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
index 5f4faee..ccda8f0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.streaming
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
import org.apache.gearpump.streaming.task.TaskId
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph.Node
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index e461ae8..29dfc57 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.master.MasterProxy
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask}
import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index d42fe6f..46175a4 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store
import org.apache.gearpump.streaming.storage.AppDataStore
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
index be3b3b7..adde927 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -22,7 +22,7 @@ package org.apache.gearpump.streaming.appmaster
import akka.actor.{ActorSystem, Props}
import akka.testkit.TestProbe
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
import org.apache.gearpump.streaming.task.{Subscriber, TaskActor}
import org.apache.gearpump.streaming._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index 5f6dd04..def9d44 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -22,7 +22,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.cluster.{AppJar, TestUtil}
import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
import org.apache.gearpump.streaming.task.TaskId
import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 54ecde1..bcf96e4 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.cluster.{AppJar, TestUtil, UserConfig}
import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered}
import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask
import org.apache.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 864aa93..1bfde94 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory
import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
import org.apache.gearpump.cluster.worker.WorkerId
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
deleted file mode 100644
index e0407ec..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.util.Graph
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
-
- implicit var system: ActorSystem = _
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "be able to generate multiple new streams" in {
- val context: ClientContext = mock[ClientContext]
- when(context.system).thenReturn(system)
-
- val dsl = StreamApp("dsl", context)
- dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
- dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
-
- val application = dsl.plan()
- application shouldBe a [StreamApplication]
- application.name shouldBe "dsl"
- val dag = application.userConfig
- .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
- dag.vertices.size shouldBe 2
- dag.vertices.foreach { processor =>
- processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
- if (processor.description == "A") {
- processor.parallelism shouldBe 2
- } else if (processor.description == "B") {
- processor.parallelism shouldBe 3
- } else {
- fail(s"undefined source ${processor.description}")
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
deleted file mode 100644
index fdc721b..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor._
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.dsl.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.Graph
-import org.apache.gearpump.util.Graph._
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Either, Left, Right}
-
-class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
-
- implicit var system: ActorSystem = _
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "translate the DSL to a DAG" in {
- val context: ClientContext = mock[ClientContext]
- when(context.system).thenReturn(system)
-
- val dsl = StreamApp("dsl", context)
-
- val data =
- """
- five four three two one
- five four three two
- five four three
- five four
- five
- """
- val stream = dsl.source(data.lines.toList, 1, "").
- flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
- map(word => (word, 1)).
- groupBy(_._1, parallelism = 2).
- reduce((left, right) => (left._1, left._2 + right._2)).
- map[Either[(String, Int), String]](Left(_))
-
- val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
- stream.merge(query).process[(String, Int)](classOf[Join], 1)
-
- val app: StreamApplication = dsl.plan()
- val dag = app.userConfig
- .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
-
- val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
- edge.partitionerFactory.partitioner.getClass.getName
- }
- val expectedDagTopology = getExpectedDagTopology
-
- dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet
- dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet
- }
-
- private def getExpectedDagTopology: Graph[String, String] = {
- val source = classOf[DataSourceTask[_, _]].getName
- val group = classOf[CountTriggerTask[_, _]].getName
- val merge = classOf[TransformTask[_, _]].getName
- val join = classOf[Join].getName
-
- val hash = classOf[HashPartitioner].getName
- val groupBy = classOf[GroupByPartitioner[_, _]].getName
- val colocation = classOf[CoLocationPartitioner].getName
-
- val expectedDagTopology = Graph(
- source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
- source ~ hash ~> merge
- )
- expectedDagTopology
- }
-}
-
-object StreamSpec {
-
- class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
- var query: String = _
-
- override def onNext(msg: Message): Unit = {
- msg.msg match {
- case Left(wordCount: (String @unchecked, Int @unchecked)) =>
- if (query != null && wordCount._1 == query) {
- taskContext.output(new Message(wordCount))
- }
-
- case Right(query: String) =>
- this.query = query
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index bf52abc..f0920de 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,7 +25,8 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.Processor.DefaultProcessor
import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
@@ -145,7 +146,6 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
val chainedOp = chainableOp1.chain(chainableOp2)
- verify(fn1).andThen(fn2)
chainedOp shouldBe a[ChainableOp[_, _]]
unchainableOps.foreach { op =>
@@ -155,12 +155,14 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
}
}
- "throw exception on getProcessor" in {
- val fn1 = mock[SingleInputFunction[Any, Any]]
- val chainableOp1 = ChainableOp[Any, Any](fn1)
- intercept[UnsupportedOperationException] {
- chainableOp1.getProcessor
- }
+ "get Processor" in {
+ val fn = mock[FlatMapFunction[Any, Any]]
+ val flatMapper = new FlatMapper(fn, "flatMap")
+ val chainableOp = ChainableOp[Any, Any](flatMapper)
+
+ val processor = chainableOp.getProcessor
+ processor shouldBe a[Processor[_]]
+ processor.parallelism shouldBe 1
}
}