You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/06 22:03:09 UTC
incubator-gearpump git commit: [GEARPUMP-339] Add ScalaDoc to
Streaming DSL
Repository: incubator-gearpump
Updated Branches:
refs/heads/master f96aca995 -> e1228a314
[GEARPUMP-339] Add ScalaDoc to Streaming DSL
Author: manuzhang <ow...@gmail.com>
Closes #212 from manuzhang/plan_doc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e1228a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e1228a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e1228a31
Branch: refs/heads/master
Commit: e1228a31445418e6aca77ad3cf4c2c18fb6748ee
Parents: f96aca9
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 7 06:02:29 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 7 06:02:47 2017 +0800
----------------------------------------------------------------------
.../topology/GearpumpStormComponentSpec.scala | 5 +-
.../storm/util/StormOutputCollectorSpec.scala | 9 +-
.../dsl/api/functions/FilterFunction.scala | 2 -
.../dsl/api/functions/FoldFunction.scala | 2 -
.../dsl/api/functions/MapFunction.scala | 2 -
.../api/functions/SerializableFunction.scala | 32 ++++
.../dsl/javaapi/functions/FlatMapFunction.scala | 2 +-
.../apache/gearpump/streaming/dsl/package.scala | 48 ++++++
.../apache/gearpump/streaming/dsl/plan/OP.scala | 171 ++++++++++++-------
.../gearpump/streaming/dsl/plan/Planner.scala | 20 ++-
.../streaming/dsl/scalaapi/Stream.scala | 1 +
.../streaming/dsl/scalaapi/StreamApp.scala | 25 ++-
.../scalaapi/functions/FlatMapFunction.scala | 2 +-
.../functions/SerializableFunction.scala | 32 ----
14 files changed, 229 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index 0891070..50204ca 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -24,12 +24,13 @@ import akka.actor.ActorRef
import backtype.storm.spout.{ISpout, SpoutOutputCollector}
import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
import backtype.storm.tuple.Tuple
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
import org.apache.gearpump.experiments.storm.util.StormOutputCollector
import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
import org.apache.gearpump.streaming.{DAG, MockUtil}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
import org.mockito.Matchers.{anyObject, eq => mockitoEq}
import org.mockito.Mockito._
import org.scalacheck.Gen
@@ -75,7 +76,7 @@ class GearpumpStormComponentSpec
property("GearpumpBolt lifecycle") {
val timestampGen = Gen.chooseNum[Long](0L, 1000L)
val freqGen = Gen.chooseNum[Int](1, 100)
- forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) =>
+ forAll(timestampGen, freqGen) { (timestamp: MilliSeconds, freq: Int) =>
val config = mock[JMap[AnyRef, AnyRef]]
val bolt = mock[IBolt]
val taskContext = MockUtil.mockTaskContext
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index 05627c9..7fab2cc 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -27,7 +27,8 @@ import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
import org.apache.gearpump.streaming.MockUtil
@@ -41,7 +42,7 @@ class StormOutputCollectorSpec
property("StormOutputCollector emits tuple values into a stream") {
forAll(timestampGen, streamIdGen, valuesGen) {
- (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
+ (timestamp: MilliSeconds, streamId: String, values: JList[AnyRef]) =>
val targets = mock[JMap[String, JMap[String, Grouping]]]
val taskToComponent = mock[JMap[Integer, String]]
val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
@@ -52,7 +53,7 @@ class StormOutputCollectorSpec
targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
- targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
+ targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
when(targets.containsKey(streamId)).thenReturn(false)
stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
@@ -85,7 +86,7 @@ class StormOutputCollectorSpec
targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
- targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
+ targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
when(targets.containsKey(streamId)).thenReturn(false)
verify(taskContext, times(0)).output(anyObject[Message])
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index 25a0929..8d3ffb3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -17,8 +17,6 @@
*/
package org.apache.gearpump.streaming.dsl.api.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
object FilterFunction {
def apply[T](fn: T => Boolean): FilterFunction[T] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
index 9ff44a8..1525d6e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
@@ -18,8 +18,6 @@
package org.apache.gearpump.streaming.dsl.api.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
/**
* Combines input into an accumulator.
*
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index a4fdca6..7880c2f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -17,8 +17,6 @@
*/
package org.apache.gearpump.streaming.dsl.api.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
object MapFunction {
def apply[T, R](fn: T => R): MapFunction[T, R] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
new file mode 100644
index 0000000..b90ba28
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
+ def teardown(): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index 11e2416..adad878 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
*/
package org.apache.gearpump.streaming.dsl.javaapi.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+import org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction
/**
* Transforms one input into zero or more outputs of possibly different types.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
new file mode 100644
index 0000000..6d43f16
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+
+// scalastyle:off line.size.limit
+/**
+ *
+ * The architecture of Gearpump Streaming DSL consists of several layers:
+ *
+ * * User facing [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. Stream is created by [[org.apache.gearpump.streaming.dsl.scalaapi.StreamApp]]
+ * from input source like Kafka or by applying high level operations (e.g. flatMap, window, groupBy) to user defined functions(UDFs). UDFs are subclasses
+ * of [[org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction]], represented by [[org.apache.gearpump.streaming.dsl.plan.Op]]
+ * in the underlying [[org.apache.gearpump.util.Graph]].
+ * * [[org.apache.gearpump.streaming.dsl.plan.Planner]], responsible for interpreting the Op Graph, optimizing it and building a low level Graph of
+ * [[org.apache.gearpump.streaming.Processor]]. Finally, it creates a runnable Graph of [[org.apache.gearpump.streaming.task.Task]].
+ * * The execution layer is usually composed of the following four tasks.
+ *
+ * * [[org.apache.gearpump.streaming.source.DataSourceTask]] for [[org.apache.gearpump.streaming.source.DataSource]] to ingest data into Gearpump
+ * * [[org.apache.gearpump.streaming.sink.DataSinkTask]] for [[org.apache.gearpump.streaming.sink.DataSink]] to write data out.
+ * * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]]
+ * * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops.
+ *
+ * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally
+ * runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with
+ * [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually
+ * executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]].
+ *
+ */
+// scalastyle:on line.size.limit
+package object dsl {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 2a45a8f..c37ced6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -35,16 +35,29 @@ import scala.reflect.ClassTag
object Op {
+ /**
+ * Concatenates two descriptions with "." or returns one if the other is empty.
+ */
def concatenate(desc1: String, desc2: String): String = {
if (desc1 == null || desc1.isEmpty) desc2
else if (desc2 == null || desc2.isEmpty) desc1
else desc1 + "." + desc2
}
+ /**
+ * Concatenates two configs according to the following rules
+ * 1. The first config cannot be null.
+ * 2. The first config is returned if the second config is null
+ * 3. The second config takes precedence for overlapping config keys
+ */
def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
config1.withConfig(config2)
}
+ /**
+ * This adds a [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] in
+ * [[GlobalWindows]] if a targeting [[Task]] has no executable UDF.
+ */
def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
@@ -59,22 +72,37 @@ object Op {
}
/**
- * This is a vertex on the logical plan.
+ * This is a vertex on the logical Graph, representing user defined functions in
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL.
*/
sealed trait Op {
+ /**
+ * This comes from user function description and is used to display it on front end.
+ */
def description: String
+ /**
+ * This will ship user function to [[org.apache.gearpump.streaming.task.Task]] to be executed.
+ */
def userConfig: UserConfig
+ /**
+ * This creates a new Op by merging their user functions, user configs and descriptions.
+ */
def chain(op: Op)(implicit system: ActorSystem): Op
+ /**
+ * This creates a Processor after chaining.
+ */
def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
}
/**
- * This represents a low level Processor.
+ * This represents a low level Processor. It is deprecated since it
+ * doesn't work with other Ops.
*/
+@deprecated
case class ProcessorOp[T <: Task](
processor: Class[T],
parallelism: Int,
@@ -99,7 +127,8 @@ case class ProcessorOp[T <: Task](
}
/**
- * This represents a DataSource.
+ * This represents a DataSource and creates a
+ * [[org.apache.gearpump.streaming.source.DataSourceTask]]
*/
case class DataSourceOp(
dataSource: DataSource,
@@ -142,7 +171,7 @@ case class DataSourceOp(
}
/**
- * This represents a DataSink.
+ * This represents a DataSink and creates a [[org.apache.gearpump.streaming.sink.DataSinkTask]].
*/
case class DataSinkOp(
dataSink: DataSink,
@@ -163,7 +192,7 @@ case class DataSinkOp(
/**
* This represents operations that can be chained together
* (e.g. flatMap, map, filter, reduce) and further chained
- * to another Op to be used
+ * to another Op to be executed
*/
case class TransformOp[IN, OUT](
fn: FunctionRunner[IN, OUT],
@@ -201,61 +230,7 @@ case class TransformOp[IN, OUT](
}
}
-/**
- * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
- * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
- * Otherwise, it will be translated to a Processor of TransformTask.
- */
-case class WindowTransformOp[IN, OUT](
- windowRunner: WindowRunner[IN, OUT],
- description: String,
- userConfig: UserConfig) extends Op {
-
- override def chain(other: Op)(implicit system: ActorSystem): Op = {
- other match {
- case op: WindowTransformOp[OUT, _] =>
- WindowTransformOp(
- WindowRunnerAT(windowRunner, op.windowRunner),
- Op.concatenate(description, op.description),
- Op.concatenate(userConfig, op.userConfig)
- )
- case _ =>
- throw new OpChainException(this, other)
- }
- }
-
- override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
- Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
- Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
- }
-}
-
-/**
- * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
- * It will later be chained to a WindowOp, which results in two WindowTransformOps.
- * Finally, they will be chained to a single WindowTransformOp.
- */
-case class TransformWindowTransformOp[IN, MIDDLE, OUT](
- transformOp: TransformOp[IN, MIDDLE],
- windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
-
- override def description: String = {
- throw new UnsupportedOperationException(s"description is not supported on $this")
- }
-
- override def userConfig: UserConfig = {
- throw new UnsupportedOperationException(s"userConfig is not supported on $this")
- }
-
- override def chain(op: Op)(implicit system: ActorSystem): Op = {
- throw new UnsupportedOperationException(s"chain is not supported on $this")
- }
- override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- WindowOp(GlobalWindows()).chain(this).toProcessor
- }
-}
/**
* This represents a window aggregation, together with a following TransformOp
@@ -290,7 +265,14 @@ case class WindowOp(
}
/**
- * This represents a Processor with groupBy and window aggregation
+ * This represents an operation with groupBy followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]].
+ * However, it's possible a window function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
*/
case class GroupByOp[IN, GROUP] private(
groupBy: IN => GROUP,
@@ -325,7 +307,14 @@ case class GroupByOp[IN, GROUP] private(
}
/**
- * This represents a Processor transforming merged streams
+ * This represents an operation with merge followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ * However, it's possible a merge function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
*/
case class MergeOp(
parallelism: Int = 1,
@@ -357,7 +346,65 @@ case class MergeOp(
}
/**
- * This is an edge on the logical plan.
+ * This is an intermediate operation, produced by chaining [[WindowOp]] and [[TransformOp]].
+ * Usually, it will be chained to a [[DataSourceOp]], [[GroupByOp]] or [[MergeOp]]. Nonetheless,
+ * Op with more than 1 outgoing edge or incoming edge cannot be chained. In that case,
+ * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ */
+private case class WindowTransformOp[IN, OUT](
+ windowRunner: WindowRunner[IN, OUT],
+ description: String,
+ userConfig: UserConfig) extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: WindowTransformOp[OUT, _] =>
+ WindowTransformOp(
+ WindowRunnerAT(windowRunner, op.windowRunner),
+ Op.concatenate(description, op.description),
+ Op.concatenate(userConfig, op.userConfig)
+ )
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+ Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+ Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+ }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining [[TransformOp]] and
+ * [[WindowTransformOp]]. It will later be chained to a [[WindowOp]], which results in
+ * two [[WindowTransformOp]]s. Finally, they will be chained to a single WindowTransformOp.
+ */
+private case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+ transformOp: TransformOp[IN, MIDDLE],
+ windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+ override def description: String = {
+ throw new UnsupportedOperationException(s"description is not supported on $this")
+ }
+
+ override def userConfig: UserConfig = {
+ throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+ }
+
+ override def chain(op: Op)(implicit system: ActorSystem): Op = {
+ throw new UnsupportedOperationException(s"chain is not supported on $this")
+ }
+
+ override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ WindowOp(GlobalWindows()).chain(this).toProcessor
+ }
+}
+
+/**
+ * This is an edge on the logical plan. It defines whether data should be transported locally
+ * or shuffled remotely between [[Op]].
*/
trait OpEdge
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index b1b39c9..04b5337 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -24,11 +24,26 @@ import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.task.Task
import org.apache.gearpump.util.Graph
+/**
+ * This class is responsible for turning the high level
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL into low level
+ * [[org.apache.gearpump.streaming.Processor]] API.
+ */
class Planner {
/**
- * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
- * level Graph API.
+ * This method interprets a Graph of [[Op]] and creates a Graph of
+ * [[org.apache.gearpump.streaming.Processor]].
+ *
+ * It firstly reversely traverses the Graph from a leaf Op and merges it with
+ * its downstream Op according to the following rules.
+ *
+ * 1. The Op has only one outgoing edge and the downstream Op has only one incoming edge
+ * 2. Neither Op is [[ProcessorOp]]
+ * 3. The edge is [[Direct]]
+ *
+ * Finally the vertices of the optimized Graph are translated to Processors
+ * and the edges to Partitioners.
*/
def plan(dag: Graph[Op, OpEdge])
(implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
@@ -43,6 +58,7 @@ class Planner {
case _ => new HashPartitioner
}
case Direct =>
+ // FIXME: This is never used
new CoLocationPartitioner
}
}.mapVertex(_.toProcessor)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index ef2753e..d0c733e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -210,6 +210,7 @@ class Stream[T](
* @param parallelism parallelism level
* @return new stream after processing with type [R]
*/
+ @deprecated
def process[R](
processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
description: String = "process"): Stream[R] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index bce8c0c..17d77bc 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -62,6 +62,17 @@ class StreamApp(
val dag = planner.plan(graph)
StreamApplication(name, dag, userConfig)
}
+
+ def source[T](dataSource: DataSource, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
+ val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
+ graph.addVertex(sourceOp)
+ new Stream[T](graph, sourceOp)
+ }
+
+ def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+ this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
+ }
}
object StreamApp {
@@ -73,20 +84,6 @@ object StreamApp {
implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
streamApp.plan()
}
-
- implicit class Source(app: StreamApp) extends java.io.Serializable {
-
- def source[T](dataSource: DataSource, parallelism: Int = 1,
- conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
- implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
- app.graph.addVertex(sourceOp)
- new Stream[T](app.graph, sourceOp)
- }
-
- def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
- this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
- }
- }
}
/** A test message source which generated message sequence repeatedly. */
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
index 252b5bd..2d26df6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
*/
package org.apache.gearpump.streaming.dsl.scalaapi.functions
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, SerializableFunction}
import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
import scala.collection.JavaConverters._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
deleted file mode 100644
index ab88bf1..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.scalaapi.functions
-
-/**
- * Superclass for all user defined function interfaces.
- * This ensures all functions are serializable and provides common methods
- * like setup and teardown. Users should not extend this class directly
- * but subclasses like [[FlatMapFunction]].
- */
-abstract class SerializableFunction extends java.io.Serializable {
-
- def setup(): Unit = {}
-
- def teardown(): Unit = {}
-
-}