You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/11 03:58:01 UTC
[1/2] incubator-gearpump git commit: Merge branch 'master' into
akka-streams
Repository: incubator-gearpump
Updated Branches:
refs/heads/akka-streams 4fe5458f4 -> bc3940352
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
new file mode 100644
index 0000000..4b7649f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on watermark progress.
+ */
+class EventTimeTriggerTask[IN, GROUP](
+ groupBy: GroupAlsoByWindow[IN, GROUP],
+ windowRunner: WindowRunner,
+ taskContext: TaskContext,
+ userConfig: UserConfig)
+ extends Task(taskContext, userConfig) {
+
+ def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+ taskContext: TaskContext, userConfig: UserConfig) = {
+ this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+ taskContext, userConfig)
+ }
+
+ def this(taskContext: TaskContext, userConfig: UserConfig) = {
+ this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+ GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+ taskContext, userConfig)
+ }
+
+ override def onNext(message: Message): Unit = {
+ windowRunner.process(message)
+ }
+
+ override def onWatermarkProgress(watermark: Instant): Unit = {
+ windowRunner.trigger(watermark)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
new file mode 100644
index 0000000..980a54b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
+import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+import scala.concurrent.duration.FiniteDuration
+
+object ProcessingTimeTriggerTask {
+ case object Triggering
+}
+
+/**
+ * This task triggers output on scheduled system time interval.
+ */
+class ProcessingTimeTriggerTask[IN, GROUP](
+ groupBy: GroupAlsoByWindow[IN, GROUP],
+ windowRunner: WindowRunner,
+ taskContext: TaskContext,
+ userConfig: UserConfig)
+ extends Task(taskContext, userConfig) {
+
+ def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+ taskContext: TaskContext, userConfig: UserConfig) = {
+ this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+ taskContext, userConfig)
+ }
+
+ def this(taskContext: TaskContext, userConfig: UserConfig) = {
+ this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+ GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+ taskContext, userConfig)
+ }
+
+ private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn]
+ private val windowSizeMs = windowFn.size.toMillis
+ private val windowStepMs = windowFn.step.toMillis
+
+ override def onStart(startTime: Instant): Unit = {
+ val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs
+ taskContext.scheduleOnce(
+ new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering)
+ }
+
+ override def onNext(message: Message): Unit = {
+ windowRunner.process(message)
+ }
+
+ override def receiveUnManagedMessage: Receive = {
+ case Triggering =>
+ windowRunner.trigger(Instant.now)
+ taskContext.scheduleOnce(
+ new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
new file mode 100644
index 0000000..e35f085
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+class TransformTask[IN, OUT](
+ operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
+ userConf: UserConfig) extends Task(taskContext, userConf) {
+
+ def this(taskContext: TaskContext, userConf: UserConfig) = {
+ this(userConf.getValue[SingleInputFunction[IN, OUT]](
+ GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val time = msg.timestamp
+
+ operator match {
+ case Some(op) =>
+ op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+ taskContext.output(new Message(msg, time))
+ }
+ case None =>
+ taskContext.output(new Message(msg.msg, time))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
new file mode 100644
index 0000000..a4524a8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait AccumulationMode
+
+case object Accumulating extends AccumulationMode
+
+case object Discarding extends AccumulationMode
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
new file mode 100644
index 0000000..30e68ba
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Divides messages into groups according its payload and timestamp.
+ * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]]
+ * for default implementation.
+ */
+trait GroupByFn[T, GROUP] {
+
+ /**
+ * Used by
+ * 1. GroupByPartitioner to shuffle messages
+ * 2. WindowRunner to group messages for time-based aggregation
+ */
+ def groupBy(message: Message): GROUP
+
+ /**
+ * Returns a Processor according to window trigger during planning
+ */
+ def getProcessor(parallelism: Int, description: String,
+ userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
new file mode 100644
index 0000000..9865e18
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait Trigger
+
+case object EventTimeTrigger extends Trigger
+
+case object ProcessingTimeTrigger extends Trigger
+
+case object CountTrigger extends Trigger
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
new file mode 100644
index 0000000..4b94879
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.Duration
+
+/**
+ *
+ * @param windowFn
+ * @param trigger
+ * @param accumulationMode
+ */
+case class Window(
+ windowFn: WindowFn,
+ trigger: Trigger = EventTimeTrigger,
+ accumulationMode: AccumulationMode = Discarding) {
+
+ def triggering(trigger: Trigger): Window = {
+ Window(windowFn, trigger)
+ }
+
+ def accumulating: Window = {
+ Window(windowFn, trigger, Accumulating)
+ }
+
+ def discarding: Window = {
+ Window(windowFn, trigger, Discarding)
+ }
+}
+
+object CountWindow {
+
+ def apply(size: Int): Window = {
+ Window(CountWindowFn(size), CountTrigger)
+ }
+}
+
+object FixedWindow {
+
+ /**
+ * Defines a FixedWindow.
+ * @param size window size
+ * @return a Window definition
+ */
+ def apply(size: Duration): Window = {
+ Window(SlidingWindowFn(size, size))
+ }
+}
+
+object SlidingWindow {
+
+ /**
+ * Defines a SlidingWindow
+ * @param size window size
+ * @param step window step to slide forward
+ * @return a Window definition
+ */
+ def apply(size: Duration, step: Duration): Window = {
+ Window(SlidingWindowFn(size, step))
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
new file mode 100644
index 0000000..0768730
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.dsl.window.impl.Bucket
+
+import scala.collection.mutable.ArrayBuffer
+
+sealed trait WindowFn {
+ def apply(timestamp: Instant): List[Bucket]
+}
+
+case class SlidingWindowFn(size: Duration, step: Duration)
+ extends WindowFn {
+
+ def this(size: Duration) = {
+ this(size, size)
+ }
+
+ override def apply(timestamp: Instant): List[Bucket] = {
+ val sizeMillis = size.toMillis
+ val stepMillis = step.toMillis
+ val timeMillis = timestamp.toEpochMilli
+ val windows = ArrayBuffer.empty[Bucket]
+ var start = lastStartFor(timeMillis, stepMillis)
+ windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+ start -= stepMillis
+ while (start >= timeMillis) {
+ windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+ start -= stepMillis
+ }
+ windows.toList
+ }
+
+ private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+ timestamp - (timestamp + windowStep) % windowStep
+ }
+}
+
+case class CountWindowFn(size: Int) extends WindowFn {
+
+ override def apply(timestamp: Instant): List[Bucket] = {
+ List(Bucket.ofEpochMilli(0, size))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
new file mode 100644
index 0000000..e978983
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.dsl.window.api.Trigger
+
+trait ReduceFnRunner {
+
+ def process(message: Message): Unit
+
+ def onTrigger(trigger: Trigger): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
new file mode 100644
index 0000000..53cf5d0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask}
+import org.apache.gearpump.streaming.task.Task
+
+object Bucket {
+ def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = {
+ Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
+ }
+}
+
+/**
+ * A window unit including startTime and excluding endTime.
+ */
+case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] {
+ override def compareTo(o: Bucket): Int = {
+ val ret = startTime.compareTo(o.startTime)
+ if (ret != 0) {
+ ret
+ } else {
+ endTime.compareTo(o.endTime)
+ }
+ }
+}
+
+case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window)
+ extends GroupByFn[T, (GROUP, List[Bucket])] {
+
+ override def groupBy(message: Message): (GROUP, List[Bucket]) = {
+ val group = groupByFn(message.msg.asInstanceOf[T])
+ val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp))
+ group -> buckets
+ }
+
+ override def getProcessor(parallelism: Int, description: String,
+ userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
+ val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this)
+ window.trigger match {
+ case CountTrigger =>
+ Processor[CountTriggerTask[T, GROUP]](parallelism, description, config)
+ case ProcessingTimeTrigger =>
+ Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config)
+ case EventTimeTrigger =>
+ Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config)
+ }
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
new file mode 100644
index 0000000..9af5e61
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.gs.collections.api.block.procedure.Procedure
+import org.apache.gearpump.gs.collections.impl.list.mutable.FastList
+import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.window.api.Discarding
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+trait WindowRunner {
+
+ def process(message: Message): Unit
+
+ def trigger(time: Instant): Unit
+
+}
+
+object DefaultWindowRunner {
+
+ private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+
+ case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
+ extends Comparable[WindowGroup[GROUP]] {
+ override def compareTo(o: WindowGroup[GROUP]): Int = {
+ val ret = bucket.compareTo(o.bucket)
+ if (ret != 0) {
+ ret
+ } else if (group.equals(o.group)) {
+ 0
+ } else {
+ -1
+ }
+ }
+ }
+}
+
+class DefaultWindowRunner[IN, GROUP, OUT](
+ taskContext: TaskContext, userConfig: UserConfig,
+ groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
+ extends WindowRunner {
+ import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
+
+ private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]]
+ private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
+
+
+ override def process(message: Message): Unit = {
+ val (group, buckets) = groupBy.groupBy(message)
+ buckets.foreach { bucket =>
+ val wg = WindowGroup(bucket, group)
+ val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
+ inputs.add(message.msg.asInstanceOf[IN])
+ windowGroups.put(wg, inputs)
+ }
+ groupFns.putIfAbsent(group,
+ userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+ }
+
+ override def trigger(time: Instant): Unit = {
+ onTrigger()
+
+ @annotation.tailrec
+ def onTrigger(): Unit = {
+ if (windowGroups.notEmpty()) {
+ val first = windowGroups.firstKey
+ if (!time.isBefore(first.bucket.endTime)) {
+ val inputs = windowGroups.remove(first)
+ val reduceFn = groupFns.get(first.group)
+ .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+ inputs.forEach(new Procedure[IN] {
+ override def value(t: IN): Unit = {
+ reduceFn.process(t)
+ }
+ })
+ reduceFn.finish()
+ if (groupBy.window.accumulationMode == Discarding) {
+ reduceFn.clearState()
+ }
+ onTrigger()
+ }
+ }
+ }
+
+ def emitResult(result: OUT, time: Instant): Unit = {
+ taskContext.output(Message(result, time.toEpochMilli))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index fb2d898..535497c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,7 +23,7 @@ import java.time.Instant
import org.apache.gearpump._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
import org.apache.gearpump.streaming.task.{Task, TaskContext}
/**
@@ -57,15 +57,10 @@ class DataSourceTask[IN, OUT] private[source](
private val processMessage: Message => Unit =
operator match {
case Some(op) =>
- op match {
- case bad: DummyInputFunction[IN] =>
- (message: Message) => context.output(message)
- case _ =>
- (message: Message) => {
- op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
- context.output(Message(m, message.timestamp))
- }
- }
+ (message: Message) => {
+ op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT =>
+ context.output(Message(m, message.timestamp))
+ }
}
case None =>
(message: Message) => context.output(message)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index eb52700..f72e5b8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -60,7 +60,7 @@ class TaskActor(
val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
// Metrics
- private val metricName = s"app$appId.processor${taskId.processorId}.task${taskId.index}"
+ private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
private val receiveLatency = Metrics(context.system).histogram(
s"$metricName:receiveLatency", sampleRate = 1)
private val processTime = Metrics(context.system).histogram(s"$metricName:processTime")
@@ -307,9 +307,9 @@ class TaskActor(
private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
if (upstreamClock > this.upstreamMinClock) {
+ this.upstreamMinClock = upstreamClock
task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
}
- this.upstreamMinClock = upstreamClock
val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
val subMin = sub._2.minClock
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
index e919a34..e0407ec 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -21,7 +21,10 @@ package org.apache.gearpump.streaming.dsl
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.util.Graph
import org.mockito.Mockito.when
import org.scalatest._
import org.scalatest.mock.MockitoSugar
@@ -30,7 +33,7 @@ import scala.concurrent.Await
import scala.concurrent.duration.Duration
class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
- implicit var system: ActorSystem = null
+ implicit var system: ActorSystem = _
override def beforeAll(): Unit = {
system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
@@ -45,49 +48,25 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M
val context: ClientContext = mock[ClientContext]
when(context.system).thenReturn(system)
- val app = StreamApp("dsl", context)
- app.source(List("A"), 1, "")
- app.source(List("B"), 1, "")
+ val dsl = StreamApp("dsl", context)
+ dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
+ dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
- assert(app.graph.vertices.size == 2)
- }
-
- it should "plan the dsl to Processsor(TaskDescription) DAG" in {
- val context: ClientContext = mock[ClientContext]
- when(context.system).thenReturn(system)
-
- val app = StreamApp("dsl", context)
- val parallism = 3
- app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _)
- val task = app.plan.dag.vertices.iterator.next()
- assert(task.taskClass == classOf[DataSourceTask[_, _]].getName)
- assert(task.parallelism == parallism)
- }
-
- it should "produce 3 messages" in {
- val context: ClientContext = mock[ClientContext]
- when(context.system).thenReturn(system)
- val app = StreamApp("dsl", context)
- val list = List[String](
- "0",
- "1",
- "2"
- )
- val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _)
- val task = app.plan.dag.vertices.iterator.next()
- /*
- val task = app.plan.dag.vertices.iterator.map(desc => {
- LOG.info(s"${desc.taskClass}")
- })
- val sum = producer.flatMap(msg => {
- LOG.info("in flatMap")
- assert(msg.msg.isInstanceOf[String])
- val num = msg.msg.asInstanceOf[String].toInt
- Array(num)
- }).reduce(_+_)
- val task = app.plan.dag.vertices.iterator.map(desc => {
- LOG.info(s"${desc.taskClass}")
- })
- */
+ val application = dsl.plan()
+ application shouldBe a [StreamApplication]
+ application.name shouldBe "dsl"
+ val dag = application.userConfig
+ .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
+ dag.vertices.size shouldBe 2
+ dag.vertices.foreach { processor =>
+ processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
+ if (processor.description == "A") {
+ processor.parallelism shouldBe 2
+ } else if (processor.description == "B") {
+ processor.parallelism shouldBe 3
+ } else {
+ fail(s"undefined source ${processor.description}")
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 816feef..fdc721b 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -22,10 +22,11 @@ import akka.actor._
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
+import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
import org.apache.gearpump.streaming.dsl.StreamSpec.Join
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
@@ -40,7 +41,6 @@ import scala.util.{Either, Left, Right}
class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
-
implicit var system: ActorSystem = _
override def beforeAll(): Unit = {
@@ -56,7 +56,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
val context: ClientContext = mock[ClientContext]
when(context.system).thenReturn(system)
- val app = StreamApp("dsl", context)
+ val dsl = StreamApp("dsl", context)
val data =
"""
@@ -66,30 +66,32 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
five four
five
"""
- val stream = app.source(data.lines.toList, 1, "").
+ val stream = dsl.source(data.lines.toList, 1, "").
flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
map(word => (word, 1)).
groupBy(_._1, parallelism = 2).
reduce((left, right) => (left._1, left._2 + right._2)).
map[Either[(String, Int), String]](Left(_))
- val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
+ val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
stream.merge(query).process[(String, Int)](classOf[Join], 1)
- val appDescription = app.plan()
+ val app: StreamApplication = dsl.plan()
+ val dag = app.userConfig
+ .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
- val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
+ val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
edge.partitionerFactory.partitioner.getClass.getName
}
val expectedDagTopology = getExpectedDagTopology
- assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet))
- assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet))
+ dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet
+ dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet
}
private def getExpectedDagTopology: Graph[String, String] = {
val source = classOf[DataSourceTask[_, _]].getName
- val group = classOf[GroupByTask[_, _, _]].getName
+ val group = classOf[CountTriggerTask[_, _]].getName
val merge = classOf[TransformTask[_, _]].getName
val join = classOf[Join].getName
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
index fcc646d..f49eb04 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
@@ -18,24 +18,33 @@
package org.apache.gearpump.streaming.dsl.partitioner
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import java.time.Duration
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.apache.gearpump.Message
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
+import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn}
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow}
class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- it should "use the outpout of groupBy function to do partition" in {
+
+ it should "group by message payload and window" in {
val mark = People("Mark", "male")
val tom = People("Tom", "male")
val michelle = People("Michelle", "female")
val partitionNum = 10
- val groupBy = new GroupByPartitioner[People, String](_.gender)
- assert(groupBy.getPartition(Message(mark), partitionNum)
- == groupBy.getPartition(Message(tom), partitionNum))
+ val groupByFn: GroupByFn[People, (String, List[Bucket])] =
+ GroupAlsoByWindow[People, String](_.gender, FixedWindow.apply(Duration.ofMillis(5)))
+ val groupBy = new GroupByPartitioner[People, (String, List[Bucket])](groupByFn)
+ groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
+ groupBy.getPartition(Message(tom, 2L), partitionNum)
+
+ groupBy.getPartition(Message(mark, 1L), partitionNum) should not be
+ groupBy.getPartition(Message(tom, 6L), partitionNum)
- assert(groupBy.getPartition(Message(mark), partitionNum)
- != groupBy.getPartition(Message(michelle), partitionNum))
+ groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
+ groupBy.getPartition(Message(michelle, 3L), partitionNum)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
new file mode 100644
index 0000000..bf52abc
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
+
+ private val unchainableOps: List[Op] = List(
+ mock[DataSourceOp],
+ mock[DataSinkOp],
+ mock[GroupByOp[Any, Any]],
+ mock[MergeOp],
+ mock[ProcessorOp[AnyTask]])
+
+ implicit var system: ActorSystem = _
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ "DataSourceOp" should {
+
+ "chain ChainableOp" in {
+ val dataSource = new AnySource
+ val dataSourceOp = DataSourceOp(dataSource)
+ val chainableOp = mock[ChainableOp[Any, Any]]
+ val fn = mock[SingleInputFunction[Any, Any]]
+
+ val chainedOp = dataSourceOp.chain(chainableOp)
+
+ chainedOp shouldBe a[DataSourceOp]
+ verify(chainableOp).fn
+
+ unchainableOps.foreach { op =>
+ intercept[OpChainException] {
+ dataSourceOp.chain(op)
+ }
+ }
+ }
+
+ "get Processor of DataSource" in {
+ val dataSource = new AnySource
+ val dataSourceOp = DataSourceOp(dataSource)
+ val processor = dataSourceOp.getProcessor
+ processor shouldBe a[Processor[_]]
+ processor.parallelism shouldBe dataSourceOp.parallelism
+ processor.description shouldBe dataSourceOp.description
+ }
+ }
+
+ "DataSinkOp" should {
+
+ "not chain any Op" in {
+ val dataSink = new AnySink
+ val dataSinkOp = DataSinkOp(dataSink)
+ val chainableOp = mock[ChainableOp[Any, Any]]
+ val ops = chainableOp +: unchainableOps
+ ops.foreach { op =>
+ intercept[OpChainException] {
+ dataSinkOp.chain(op)
+ }
+ }
+ }
+
+ "get Processor of DataSink" in {
+ val dataSink = new AnySink
+ val dataSinkOp = DataSinkOp(dataSink)
+ val processor = dataSinkOp.getProcessor
+ processor shouldBe a[Processor[_]]
+ processor.parallelism shouldBe dataSinkOp.parallelism
+ processor.description shouldBe dataSinkOp.description
+ }
+ }
+
+ "ProcessorOp" should {
+
+ "not chain any Op" in {
+ val processorOp = new ProcessorOp[AnyTask]
+ val chainableOp = mock[ChainableOp[Any, Any]]
+ val ops = chainableOp +: unchainableOps
+ ops.foreach { op =>
+ intercept[OpChainException] {
+ processorOp.chain(op)
+ }
+ }
+ }
+
+ "get Processor" in {
+ val processorOp = new ProcessorOp[AnyTask]
+ val processor = processorOp.getProcessor
+ processor shouldBe a [DefaultProcessor[_]]
+ processor.parallelism shouldBe processorOp.parallelism
+ processor.description shouldBe processorOp.description
+ }
+ }
+
+ "ChainableOp" should {
+
+ "chain ChainableOp" in {
+ val fn1 = mock[SingleInputFunction[Any, Any]]
+ val chainableOp1 = ChainableOp[Any, Any](fn1)
+
+ val fn2 = mock[SingleInputFunction[Any, Any]]
+ val chainableOp2 = ChainableOp[Any, Any](fn2)
+
+ val chainedOp = chainableOp1.chain(chainableOp2)
+
+ verify(fn1).andThen(fn2)
+ chainedOp shouldBe a[ChainableOp[_, _]]
+
+ unchainableOps.foreach { op =>
+ intercept[OpChainException] {
+ chainableOp1.chain(op)
+ }
+ }
+ }
+
+ "throw exception on getProcessor" in {
+ val fn1 = mock[SingleInputFunction[Any, Any]]
+ val chainableOp1 = ChainableOp[Any, Any](fn1)
+ intercept[UnsupportedOperationException] {
+ chainableOp1.getProcessor
+ }
+ }
+ }
+
+ "GroupByOp" should {
+
+ "chain ChainableOp" in {
+ val groupByFn = mock[GroupByFn[Any, Any]]
+ val groupByOp = GroupByOp[Any, Any](groupByFn)
+ val fn = mock[SingleInputFunction[Any, Any]]
+ val chainableOp = mock[ChainableOp[Any, Any]]
+ when(chainableOp.fn).thenReturn(fn)
+
+ val chainedOp = groupByOp.chain(chainableOp)
+ chainedOp shouldBe a[GroupByOp[_, _]]
+
+ unchainableOps.foreach { op =>
+ intercept[OpChainException] {
+ groupByOp.chain(op)
+ }
+ }
+ }
+
+ "delegate to groupByFn on getProcessor" in {
+ val groupByFn = mock[GroupByFn[Any, Any]]
+ val groupByOp = GroupByOp[Any, Any](groupByFn)
+
+ groupByOp.getProcessor
+ verify(groupByFn).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem])
+ }
+ }
+
+ "MergeOp" should {
+
+ val mergeOp = MergeOp("merge")
+
+ "chain ChainableOp" in {
+ val fn = mock[SingleInputFunction[Any, Any]]
+ val chainableOp = mock[ChainableOp[Any, Any]]
+ when(chainableOp.fn).thenReturn(fn)
+
+ val chainedOp = mergeOp.chain(chainableOp)
+ chainedOp shouldBe a [MergeOp]
+
+ unchainableOps.foreach { op =>
+ intercept[OpChainException] {
+ mergeOp.chain(op)
+ }
+ }
+ }
+
+ "get Processor" in {
+ val processor = mergeOp.getProcessor
+ processor shouldBe a[Processor[_]]
+ processor.parallelism shouldBe 1
+ }
+ }
+}
+
+object OpSpec {
+ class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
+
+ class AnySource extends DataSource {
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+ override def read(): Message = Message("any")
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = Instant.now()
+ }
+
+ class AnySink extends DataSink {
+
+ override def open(context: TaskContext): Unit = {}
+
+ override def write(message: Message): Unit = {}
+
+ override def close(): Unit = {}
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
deleted file mode 100644
index 2112fd0..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import java.time.Instant
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import akka.actor.ActorSystem
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest._
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.source.DataSourceTask
-
-class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-
- "andThen" should "chain multiple single input function" in {
- val dummy = new DummyInputFunction[String]
- val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
-
- val filter = new FlatMapFunction[String, String](word =>
- if (word.isEmpty) None else Some(word), "filter")
-
- val map = new FlatMapFunction[String, Int](word => Some(1), "map")
-
- val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
-
- val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum)
-
- assert(all.description == "split.filter.map.sum")
-
- val data =
- """
- five four three two one
- five four three two
- five four three
- five four
- five
- """
- val count = all.process(data).toList.last
- assert(count == 15)
- }
-
- "Source" should "iterate over input source and apply attached operator" in {
-
- val taskContext = MockUtil.mockTaskContext
- implicit val actorSystem = MockUtil.system
-
- val data = "one two three".split("\\s")
- val dataSource = new CollectionDataSource[String](data)
- val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
-
- // Source with no transformer
- val source = new DataSourceTask[String, String](
- taskContext, conf)
- source.onStart(Instant.EPOCH)
- source.onNext(Message("next"))
- data.foreach { s =>
- verify(taskContext, times(1)).output(Message(s))
- }
-
- // Source with transformer
- val anotherTaskContext = MockUtil.mockTaskContext
- val double = new FlatMapFunction[String, String](word => List(word, word), "double")
- val another = new DataSourceTask(anotherTaskContext,
- conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
- another.onStart(Instant.EPOCH)
- another.onNext(Message("next"))
- data.foreach { s =>
- verify(anotherTaskContext, times(2)).output(Message(s))
- }
- }
-
- "GroupByTask" should "group input by groupBy Function and " +
- "apply attached operator for each group" in {
-
- val data = "1 2 2 3 3 3"
-
- val concat = new ReduceFunction[String]({ (left, right) =>
- left + right
- }, "concat")
-
- implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](
- GEARPUMP_STREAMING_OPERATOR, concat)
-
- val taskContext = MockUtil.mockTaskContext
-
- val task = new GroupByTask[String, String, String](input => input, taskContext, config)
- task.onStart(Instant.EPOCH)
-
- val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
- data.split("\\s+").foreach { word =>
- task.onNext(Message(word))
- }
- verify(taskContext, times(6)).output(peopleCaptor.capture())
-
- import scala.collection.JavaConverters._
-
- val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
- assert(values.mkString(",") == "1,2,22,3,33,333")
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- "MergeTask" should "accept two stream and apply the attached operator" in {
-
- // Source with transformer
- val taskContext = MockUtil.mockTaskContext
- val conf = UserConfig.empty
- val double = new FlatMapFunction[String, String](word => List(word, word), "double")
- val task = new TransformTask[String, String](Some(double), taskContext, conf)
- task.onStart(Instant.EPOCH)
-
- val data = "1 2 2 3 3 3".split("\\s+")
-
- data.foreach { input =>
- task.onNext(Message(input))
- }
-
- verify(taskContext, times(data.length * 2)).output(anyObject())
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
new file mode 100644
index 0000000..f8666ba
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.CoLocationPartitioner
+import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.{MockUtil, Processor}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
+
+ implicit var system: ActorSystem = _
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ "Planner" should "chain operations" in {
+ val graph = Graph.empty[Op, OpEdge]
+ val sourceOp = DataSourceOp(new AnySource)
+ val groupByOp = GroupByOp(new AnyGroupByFn)
+ val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction)
+ val reduceOp = ChainableOp[Any, Any](anyReduceFunction)
+ val processorOp = new ProcessorOp[AnyTask]
+ val sinkOp = DataSinkOp(new AnySink)
+ val directEdge = Direct
+ val shuffleEdge = Shuffle
+
+ graph.addVertex(sourceOp)
+ graph.addVertex(groupByOp)
+ graph.addEdge(sourceOp, shuffleEdge, groupByOp)
+ graph.addVertex(flatMapOp)
+ graph.addEdge(groupByOp, directEdge, flatMapOp)
+ graph.addVertex(reduceOp)
+ graph.addEdge(flatMapOp, directEdge, reduceOp)
+ graph.addVertex(processorOp)
+ graph.addEdge(reduceOp, directEdge, processorOp)
+ graph.addVertex(sinkOp)
+ graph.addEdge(processorOp, directEdge, sinkOp)
+
+ implicit val system = MockUtil.system
+
+ val planner = new Planner
+ val plan = planner.plan(graph)
+ .mapVertex(_.description)
+
+ plan.vertices.toSet should contain theSameElementsAs
+ Set("source", "groupBy", "processor", "sink")
+ plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]]
+ plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner]
+ plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner]
+ }
+}
+
+object PlannerSpec {
+
+ private val anyParallelism = 1
+ private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap")
+ private val anyReduceFunction = new ReduceFunction[Any](
+ (left: Any, right: Any) => (left, right), "reduce")
+
+ class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
+
+ class AnySource extends DataSource {
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+ override def read(): Message = Message("any")
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = Instant.now()
+ }
+
+ class AnySink extends DataSink {
+
+ override def open(context: TaskContext): Unit = {}
+
+ override def write(message: Message): Unit = {}
+
+ override def close(): Unit = {}
+ }
+
+ class AnyGroupByFn extends GroupByFn[Any, Any] {
+
+ override def groupBy(message: Message): Any = message.msg
+
+ override def getProcessor(
+ parallelism: Int,
+ description: String,
+ userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Processor[AnyTask](anyParallelism, description)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
new file mode 100644
index 0000000..94feae4
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{Matchers, WordSpec}
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
+ import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._
+
+ "AndThen" should {
+
+ val first = mock[SingleInputFunction[R, S]]
+ val second = mock[SingleInputFunction[S, T]]
+ val andThen = new AndThen(first, second)
+
+ "chain first and second functions when processing input value" in {
+ val input = mock[R]
+ val firstOutput = mock[S]
+ val secondOutput = mock[T]
+ when(first.process(input)).thenReturn(Some(firstOutput))
+ when(second.process(firstOutput)).thenReturn(Some(secondOutput))
+
+ andThen.process(input).toList shouldBe List(secondOutput)
+ }
+
+ "return chained description" in {
+ when(first.description).thenReturn("first")
+ when(second.description).thenReturn("second")
+ andThen.description shouldBe "first.second"
+ }
+
+ "return either first result or second on finish" in {
+ val firstResult = mock[S]
+ val processedFirst = mock[T]
+ val secondResult = mock[T]
+
+ when(first.finish()).thenReturn(Some(firstResult))
+ when(second.process(firstResult)).thenReturn(Some(processedFirst))
+ andThen.finish().toList shouldBe List(processedFirst)
+
+ when(first.finish()).thenReturn(None)
+ when(second.finish()).thenReturn(Some(secondResult))
+ andThen.finish().toList shouldBe List(secondResult)
+ }
+
+ "clear both states on clearState" in {
+ andThen.clearState()
+
+ verify(first).clearState()
+ verify(second).clearState()
+ }
+
+ "return AndThen on andThen" in {
+ val third = mock[SingleInputFunction[T, Any]]
+ andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]]
+ }
+ }
+
+ "FlatMapFunction" should {
+
+ val flatMap = mock[R => TraversableOnce[S]]
+ val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
+
+ "call flatMap function when processing input value" in {
+ val input = mock[R]
+ flatMapFunction.process(input)
+ verify(flatMap).apply(input)
+ }
+
+ "return passed in description" in {
+ flatMapFunction.description shouldBe "flatMap"
+ }
+
+ "return None on finish" in {
+ flatMapFunction.finish() shouldBe List.empty[S]
+ }
+
+ "do nothing on clearState" in {
+ flatMapFunction.clearState()
+ verifyZeroInteractions(flatMap)
+ }
+
+ "return AndThen on andThen" in {
+ val other = mock[SingleInputFunction[S, T]]
+ flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
+ }
+ }
+
+ "ReduceFunction" should {
+
+
+ "call reduce function when processing input value" in {
+ val reduce = mock[(T, T) => T]
+ val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+ val input1 = mock[T]
+ val input2 = mock[T]
+ val output = mock[T]
+
+ when(reduce.apply(input1, input2)).thenReturn(output, output)
+
+ reduceFunction.process(input1) shouldBe List.empty[T]
+ reduceFunction.process(input2) shouldBe List.empty[T]
+ reduceFunction.finish() shouldBe List(output)
+
+ reduceFunction.clearState()
+ reduceFunction.process(input1) shouldBe List.empty[T]
+ reduceFunction.clearState()
+ reduceFunction.process(input2) shouldBe List.empty[T]
+ reduceFunction.finish() shouldBe List(input2)
+ }
+
+ "return passed in description" in {
+ val reduce = mock[(T, T) => T]
+ val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+ reduceFunction.description shouldBe "reduce"
+ }
+
+ "return None on finish" in {
+ val reduce = mock[(T, T) => T]
+ val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+ reduceFunction.finish() shouldBe List.empty[T]
+ }
+
+ "do nothing on clearState" in {
+ val reduce = mock[(T, T) => T]
+ val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+ reduceFunction.clearState()
+ verifyZeroInteractions(reduce)
+ }
+
+ "return AndThen on andThen" in {
+ val reduce = mock[(T, T) => T]
+ val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+ val other = mock[SingleInputFunction[T, Any]]
+ reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
+ }
+ }
+
+ "EmitFunction" should {
+
+ val emit = mock[T => Unit]
+ val emitFunction = new EmitFunction[T](emit)
+
+ "emit input value when processing input value" in {
+ val input = mock[T]
+
+ emitFunction.process(input) shouldBe List.empty[Unit]
+
+ verify(emit).apply(input)
+ }
+
+ "return empty description" in {
+ emitFunction.description shouldBe ""
+ }
+
+ "return None on finish" in {
+ emitFunction.finish() shouldBe List.empty[Unit]
+ }
+
+ "do nothing on clearState" in {
+ emitFunction.clearState()
+ verifyZeroInteractions(emit)
+ }
+
+ "throw exception on andThen" in {
+ val other = mock[SingleInputFunction[Unit, Any]]
+ intercept[UnsupportedOperationException] {
+ emitFunction.andThen(other)
+ }
+ }
+ }
+
+ "andThen" should {
+ "chain multiple single input function" in {
+ val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
+
+ val filter = new FlatMapFunction[String, String](word =>
+ if (word.isEmpty) None else Some(word), "filter")
+
+ val map = new FlatMapFunction[String, Int](word => Some(1), "map")
+
+ val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
+
+ val all = split.andThen(filter).andThen(map).andThen(sum)
+
+ assert(all.description == "split.filter.map.sum")
+
+ val data =
+ """
+ five four three two one
+ five four three two
+ five four three
+ five four
+ five
+ """
+ // force eager evaluation
+ all.process(data).toList
+ val result = all.finish().toList
+ assert(result.nonEmpty)
+ assert(result.last == 15)
+ }
+ }
+
+ "Source" should {
+ "iterate over input source and apply attached operator" in {
+
+ val taskContext = MockUtil.mockTaskContext
+ implicit val actorSystem = MockUtil.system
+
+ val data = "one two three".split("\\s")
+ val dataSource = new CollectionDataSource[String](data)
+ val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
+
+ // Source with no transformer
+ val source = new DataSourceTask[String, String](
+ taskContext, conf)
+ source.onStart(Instant.EPOCH)
+ source.onNext(Message("next"))
+ data.foreach { s =>
+ verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
+ message => message.msg == s))
+ }
+
+ // Source with transformer
+ val anotherTaskContext = MockUtil.mockTaskContext
+ val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+ val another = new DataSourceTask(anotherTaskContext,
+ conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
+ another.onStart(Instant.EPOCH)
+ another.onNext(Message("next"))
+ data.foreach { s =>
+ verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
+ message => message.msg == s))
+ }
+ }
+ }
+
+ "CountTriggerTask" should {
+ "group input by groupBy Function and " +
+ "apply attached operator for each group" in {
+
+ val data = "1 2 2 3 3 3"
+
+ val concat = new ReduceFunction[String]({ (left, right) =>
+ left + right
+ }, "concat")
+
+ implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](
+ GEARPUMP_STREAMING_OPERATOR, concat)
+
+ val taskContext = MockUtil.mockTaskContext
+
+ val groupBy = GroupAlsoByWindow((input: String) => input, CountWindow.apply(1).accumulating)
+ val task = new CountTriggerTask[String, String](groupBy, taskContext, config)
+ task.onStart(Instant.EPOCH)
+
+ val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
+
+ data.split("\\s+").foreach { word =>
+ task.onNext(Message(word))
+ }
+ verify(taskContext, times(6)).output(peopleCaptor.capture())
+
+ import scala.collection.JavaConverters._
+
+ val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
+ assert(values.mkString(",") == "1,2,22,3,33,333")
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+ }
+
+ "MergeTask" should {
+ "accept two stream and apply the attached operator" in {
+
+ // Source with transformer
+ val taskContext = MockUtil.mockTaskContext
+ val conf = UserConfig.empty
+ val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+ val task = new TransformTask[String, String](Some(double), taskContext, conf)
+ task.onStart(Instant.EPOCH)
+
+ val data = "1 2 2 3 3 3".split("\\s+")
+
+ data.foreach { input =>
+ task.onNext(Message(input))
+ }
+
+ verify(taskContext, times(data.length * 2)).output(anyObject())
+ }
+ }
+}
+
+object SingleInputFunctionSpec {
+ type R = AnyRef
+ type S = AnyRef
+ type T = AnyRef
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
new file mode 100644
index 0000000..871d751
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class CountTriggerTaskSpec extends PropSpec with PropertyChecks
+ with Matchers with MockitoSugar {
+
+ property("CountTriggerTask should trigger output by number of messages in a window") {
+
+ implicit val system = MockUtil.system
+
+ val numGen = Gen.chooseNum[Int](1, 1000)
+
+ forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) =>
+
+ val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+ val window = CountWindow.apply(windowSize)
+ when(groupBy.window).thenReturn(window)
+ val windowRunner = mock[WindowRunner]
+ val userConfig = UserConfig.empty
+
+ val task = new CountTriggerTask[Any, Any](groupBy, windowRunner,
+ MockUtil.mockTaskContext, userConfig)
+ val message = mock[Message]
+
+ for (i <- 1 to msgNum) {
+ task.onNext(message)
+ }
+ verify(windowRunner, times(msgNum)).process(message)
+ verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
new file mode 100644
index 0000000..a69abe6
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks
+ with Matchers with MockitoSugar {
+
+ property("EventTimeTriggerTask should trigger on watermark") {
+ val longGen = Gen.chooseNum[Long](1L, 1000L)
+ val windowSizeGen = longGen
+ val windowStepGen = longGen
+ val watermarkGen = longGen.map(Instant.ofEpochMilli)
+
+ forAll(windowSizeGen, windowStepGen, watermarkGen) {
+ (windowSize: Long, windowStep: Long, watermark: Instant) =>
+
+ val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+ Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
+ val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+ val windowRunner = mock[WindowRunner]
+ val context = MockUtil.mockTaskContext
+ val config = UserConfig.empty
+
+ when(groupBy.window).thenReturn(window)
+
+ val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
+
+ val message = mock[Message]
+ task.onNext(message)
+ verify(windowRunner).process(message)
+
+ task.onWatermarkProgress(watermark)
+ verify(windowRunner).trigger(any[Instant])
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
new file mode 100644
index 0000000..39e1b4c
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
+import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow}
+import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks
+ with Matchers with MockitoSugar {
+
+ property("ProcessingTimeTriggerTask should trigger on system time interval") {
+ val longGen = Gen.chooseNum[Long](1L, 1000L)
+ val windowSizeGen = longGen
+ val windowStepGen = longGen
+ val startTimeGen = longGen.map(Instant.ofEpochMilli)
+
+ forAll(windowSizeGen, windowStepGen, startTimeGen) {
+ (windowSize: Long, windowStep: Long, startTime: Instant) =>
+
+ val window = SlidingWindow.apply(Duration.ofMillis(windowSize),
+ Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
+ val groupBy = mock[GroupAlsoByWindow[Any, Any]]
+ val windowRunner = mock[WindowRunner]
+ val context = MockUtil.mockTaskContext
+ val config = UserConfig.empty
+
+ when(groupBy.window).thenReturn(window)
+
+ val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
+
+ task.onStart(startTime)
+
+ val message = mock[Message]
+ task.onNext(message)
+ verify(windowRunner).process(message)
+
+ task.receiveUnManagedMessage(Triggering)
+ verify(windowRunner).trigger(any[Instant])
+ }
+ }
+
+}
[2/2] incubator-gearpump git commit: Merge branch 'master' into
akka-streams
Posted by ma...@apache.org.
Merge branch 'master' into akka-streams
Author: manuzhang <ow...@gmail.com>
Author: darionyaphet <da...@gmail.com>
Closes #95 from manuzhang/akka-streams.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/bc394035
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/bc394035
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/bc394035
Branch: refs/heads/akka-streams
Commit: bc39403525b4065360acf82386fac21a588b59e6
Parents: 4fe5458
Author: manuzhang <ow...@gmail.com>
Authored: Tue Oct 11 11:57:48 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Oct 11 11:57:48 2016 +0800
----------------------------------------------------------------------
.../wordcount/dsl/WindowedWordCount.scala | 87 ++++
.../apache/gearpump/redis/RedisMessage.scala | 456 +++++++++++++++++++
.../org/apache/gearpump/redis/RedisSink.scala | 119 +++++
project/Build.scala | 62 ++-
project/BuildShaded.scala | 127 +++---
.../apache/gearpump/streaming/Constants.scala | 1 +
.../gearpump/streaming/StreamApplication.scala | 2 +-
.../apache/gearpump/streaming/dsl/Stream.scala | 106 +++--
.../gearpump/streaming/dsl/StreamApp.scala | 34 +-
.../streaming/dsl/javaapi/JavaStream.scala | 22 +-
.../apache/gearpump/streaming/dsl/op/OP.scala | 109 -----
.../dsl/partitioner/GroupByPartitioner.scala | 49 ++
.../dsl/partitioner/GroupbyPartitioner.scala | 46 --
.../apache/gearpump/streaming/dsl/plan/OP.scala | 214 +++++++++
.../streaming/dsl/plan/OpTranslator.scala | 222 ---------
.../gearpump/streaming/dsl/plan/Planner.scala | 65 ++-
.../plan/functions/SingleInputFunction.scala | 107 +++++
.../streaming/dsl/task/CountTriggerTask.scala | 63 +++
.../dsl/task/EventTimeTriggerTask.scala | 59 +++
.../dsl/task/ProcessingTimeTriggerTask.scala | 82 ++++
.../streaming/dsl/task/TransformTask.scala | 47 ++
.../dsl/window/api/AccumulationMode.scala | 24 +
.../streaming/dsl/window/api/GroupByFn.scala | 47 ++
.../streaming/dsl/window/api/Trigger.scala | 27 ++
.../streaming/dsl/window/api/Window.scala | 77 ++++
.../streaming/dsl/window/api/WindowFn.scala | 63 +++
.../dsl/window/impl/ReduceFnRunner.scala | 29 ++
.../streaming/dsl/window/impl/Window.scala | 75 +++
.../dsl/window/impl/WindowRunner.scala | 114 +++++
.../streaming/source/DataSourceTask.scala | 15 +-
.../gearpump/streaming/task/TaskActor.scala | 4 +-
.../gearpump/streaming/dsl/StreamAppSpec.scala | 67 +--
.../gearpump/streaming/dsl/StreamSpec.scala | 24 +-
.../partitioner/GroupByPartitionerSpec.scala | 23 +-
.../gearpump/streaming/dsl/plan/OpSpec.scala | 244 ++++++++++
.../streaming/dsl/plan/OpTranslatorSpec.scala | 148 ------
.../streaming/dsl/plan/PlannerSpec.scala | 132 ++++++
.../functions/SingleInputFunctionSpec.scala | 333 ++++++++++++++
.../dsl/task/CountTriggerTaskSpec.scala | 61 +++
.../dsl/task/EventTimeTriggerTaskSpec.scala | 66 +++
.../task/ProcessingTimeTriggerTaskSpec.scala | 69 +++
41 files changed, 2937 insertions(+), 784 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
new file mode 100644
index 0000000..4f43fd4
--- /dev/null
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.wordcount.dsl
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.AkkaApp
+
+object WindowedWordCount extends AkkaApp with ArgumentsParser {
+
+ override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val context = ClientContext(akkaConf)
+ val app = StreamApp("dsl", context)
+ app.source[String](new TimedDataSource).
+ // word => (word, count)
+ flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ // fix window
+ window(FixedWindow.apply(Duration.ofMillis(5L))
+ .triggering(EventTimeTrigger)).
+ // (word, count1), (word, count2) => (word, count1 + count2)
+ groupBy(_._1).
+ sum.sink(new LoggerSink)
+
+ context.submit(app)
+ context.close()
+ }
+
+ private class TimedDataSource extends DataSource {
+
+ private var data = List(
+ Message("foo", 1L),
+ Message("bar", 2L),
+ Message("foo", 3L),
+ Message("foo", 5L),
+ Message("bar", 7L),
+ Message("bar", 8L)
+ )
+
+ private var watermark: Instant = Instant.ofEpochMilli(0)
+
+ override def read(): Message = {
+ if (data.nonEmpty) {
+ val msg = data.head
+ data = data.tail
+ watermark = Instant.ofEpochMilli(msg.timestamp)
+ msg
+ } else {
+ null
+ }
+ }
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = {
+ if (data.isEmpty) {
+ watermark = watermark.plusMillis(1)
+ }
+ watermark
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
new file mode 100644
index 0000000..84dec70
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import java.nio.charset.Charset
+
+object RedisMessage {
+
+ private def toBytes(strings: List[String]): List[Array[Byte]] =
+ strings.map(string => string.getBytes(Charset.forName("UTF8")))
+
+ private def toBytes(string: String): Array[Byte] =
+ string.getBytes(Charset.forName("UTF8"))
+
+ object Connection {
+
+ /**
+ * Change the selected database for the current connection
+ *
+ * @param index
+ */
+ case class SELECT(index: Int)
+
+ }
+
+ object Geo {
+
+ /**
+ * Add one geospatial item in the geospatial index represented using a sorted set
+ *
+ * @param key
+ * @param longitude
+ * @param latitude
+ * @param member
+ */
+ case class GEOADD(key: Array[Byte], longitude: Double,
+ latitude: Double, member: Array[Byte]) {
+ def this(key: String, longitude: Double,
+ latitude: Double, member: String) =
+ this(toBytes(key), longitude, latitude, toBytes(member))
+ }
+
+ }
+
+ object Hashes {
+
+ /**
+ * Delete a hash field
+ *
+ * @param key
+ * @param field
+ */
+ case class HDEL(key: Array[Byte], field: Array[Byte]) {
+ def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+ }
+
+ /**
+ * Increment the integer value of a hash field by the given number
+ *
+ * @param key
+ * @param field
+ * @param increment
+ */
+ case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
+ def this(key: String, field: String, increment: Long) =
+ this(toBytes(key), toBytes(field), increment)
+ }
+
+ /**
+ * Increment the float value of a hash field by the given amount
+ *
+ * @param key
+ * @param field
+ * @param increment
+ */
+ case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) {
+ def this(key: String, field: String, increment: Float) =
+ this(toBytes(key), toBytes(field), increment)
+ }
+
+
+ /**
+ * Set the string value of a hash field
+ *
+ * @param key
+ * @param field
+ * @param value
+ */
+ case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+ def this(key: String, field: String, value: String) =
+ this(toBytes(key), toBytes(field), toBytes(value))
+ }
+
+ /**
+ * Set the value of a hash field, only if the field does not exist
+ *
+ * @param key
+ * @param field
+ * @param value
+ */
+ case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+ def this(key: String, field: String, value: String) =
+ this(toBytes(key), toBytes(field), toBytes(value))
+ }
+
+ }
+
+ object HyperLogLog {
+
+ /**
+ * Adds the specified elements to the specified HyperLogLog
+ *
+ * @param key
+ * @param element
+ */
+ case class PFADD(key: String, element: String)
+
+ }
+
+ object Lists {
+
+
+ /**
+ * Prepend one or multiple values to a list
+ *
+ * @param key
+ * @param value
+ */
+ case class LPUSH(key: Array[Byte], value: Array[Byte]) {
+
+ def this(key: String, value: String) = this(key, toBytes(value))
+ }
+
+ /**
+ * Prepend a value to a list, only if the list exists
+ *
+ * @param key
+ * @param value
+ */
+ case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Set the value of an element in a list by its index
+ *
+ * @param key
+ * @param index
+ * @param value
+ */
+ case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
+ def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value))
+ }
+
+ /**
+ * Append one or multiple values to a list
+ *
+ * @param key
+ * @param value
+ */
+ case class RPUSH(key: Array[Byte], value: Array[Byte]) {
+
+ def this(key: String, value: String) = this(key, toBytes(value))
+ }
+
+ /**
+ * Append a value to a list, only if the list exists
+ *
+ * @param key
+ * @param value
+ */
+ case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ }
+
+ object Keys {
+
+ /**
+ * Delete a key
+ *
+ * @param message
+ */
+ case class DEL(message: Array[Byte]) {
+
+ def this(message: String) = this(toBytes(message))
+ }
+
+ /**
+ * Set a key's time to live in seconds
+ *
+ * @param key
+ */
+ case class EXPIRE(key: Array[Byte], seconds: Int) {
+ def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+ }
+
+ /**
+ * Set the expiration for a key as a UNIX timestamp
+ *
+ * @param key
+ * @param timestamp
+ */
+ case class EXPIREAT(key: Array[Byte], timestamp: Long) {
+ def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+ }
+
+ /**
+ * Atomically transfer a key from a Redis instance to another one.
+ *
+ * @param host
+ * @param port
+ * @param key
+ * @param database
+ * @param timeout
+ */
+ case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) {
+ def this(host: String, port: Int, key: String, database: Int, timeout: Int) =
+ this(toBytes(host), port, toBytes(key), database, timeout)
+ }
+
+ /**
+ * Move a key to another database
+ *
+ * @param key
+ * @param db
+ */
+ case class MOVE(key: Array[Byte], db: Int) {
+ def this(key: String, db: Int) = this(toBytes(key), db)
+ }
+
+ /**
+ * Remove the expiration from a key
+ *
+ * @param key
+ */
+ case class PERSIST(key: Array[Byte]) {
+ def this(key: String) = this(toBytes(key))
+ }
+
+ /**
+ * Set a key's time to live in milliseconds
+ *
+ * @param key
+ * @param milliseconds
+ */
+ case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
+ def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+ }
+
+ /**
+ * Set the expiration for a key as a UNIX timestamp specified in milliseconds
+ *
+ * @param key
+ * @param timestamp
+ */
+ case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
+ def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+ }
+
+ /**
+ * Rename a key
+ *
+ * @param key
+ * @param newKey
+ */
+ case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
+ def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+ }
+
+ /**
+ * Rename a key, only if the new key does not exist
+ *
+ * @param key
+ * @param newKey
+ */
+ case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
+ def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+ }
+
+ }
+
+
+ object Sets {
+
+ /**
+ * Add one or more members to a set
+ *
+ * @param key
+ * @param members
+ */
+ case class SADD(key: Array[Byte], members: Array[Byte]) {
+
+ def this(key: String, members: String) = this(key, toBytes(members))
+ }
+
+
+ /**
+ * Move a member from one set to another
+ *
+ * @param source
+ * @param destination
+ * @param member
+ */
+ case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) {
+ def this(source: String, destination: String, member: String) =
+ this(toBytes(source), toBytes(destination), toBytes(member))
+ }
+
+
+ /**
+ * Remove one or more members from a set
+ *
+ * @param key
+ * @param member
+ */
+ case class SREM(key: Array[Byte], member: Array[Byte]) {
+
+ def this(key: String, member: String) = this(key, toBytes(member))
+ }
+
+ }
+
+ object String {
+
+ /**
+ * Append a value to a key
+ *
+ * @param key
+ * @param value
+ */
+ case class APPEND(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Decrement the integer value of a key by one
+ *
+ * @param key
+ */
+ case class DECR(key: Array[Byte]) {
+ def this(key: String) = this(toBytes(key))
+ }
+
+ /**
+ * Decrement the integer value of a key by the given number
+ *
+ * @param key
+ * @param decrement
+ */
+ case class DECRBY(key: Array[Byte], decrement: Int) {
+ def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+ }
+
+ /**
+ * Increment the integer value of a key by one
+ *
+ * @param key
+ */
+ case class INCR(key: Array[Byte]) {
+ def this(key: String) = this(toBytes(key))
+ }
+
+ /**
+ * Increment the integer value of a key by the given amount
+ *
+ * @param key
+ * @param increment
+ */
+ case class INCRBY(key: Array[Byte], increment: Int) {
+ def this(key: String, increment: Int) = this(toBytes(key), increment)
+ }
+
+ /**
+ * Increment the float value of a key by the given amount
+ *
+ * @param key
+ * @param increment
+ */
+ case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
+ def this(key: String, increment: Number) = this(toBytes(key), increment)
+ }
+
+
+ /**
+ * Set the string value of a key
+ *
+ * @param key
+ * @param value
+ */
+ case class SET(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Sets or clears the bit at offset in the string value stored at key
+ *
+ * @param key
+ * @param offset
+ * @param value
+ */
+ case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
+ def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value))
+ }
+
+ /**
+ * Set the value and expiration of a key
+ *
+ * @param key
+ * @param seconds
+ * @param value
+ */
+ case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
+ def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value))
+ }
+
+ /**
+ * Set the value of a key, only if the key does not exist
+ *
+ * @param key
+ * @param value
+ */
+ case class SETNX(key: Array[Byte], value: Array[Byte]) {
+ def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+ }
+
+ /**
+ * Overwrite part of a string at key starting at the specified offset
+ *
+ * @param key
+ * @param offset
+ * @param value
+ */
+ case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
+ def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value))
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
new file mode 100644
index 0000000..3f75949
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD
+import org.apache.gearpump.redis.RedisMessage.Hashes._
+import org.apache.gearpump.redis.RedisMessage.HyperLogLog._
+import org.apache.gearpump.redis.RedisMessage.Keys._
+import org.apache.gearpump.redis.RedisMessage.Lists._
+import org.apache.gearpump.redis.RedisMessage.Sets._
+import org.apache.gearpump.redis.RedisMessage.String._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import redis.clients.jedis.Jedis
+import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}
+
+/**
+ * Save message in Redis Instance
+ *
+ * @param host
+ * @param port
+ * @param timeout
+ * @param database
+ * @param password
+ */
+class RedisSink(
+ host: String = DEFAULT_HOST,
+ port: Int = DEFAULT_PORT,
+ timeout: Int = DEFAULT_TIMEOUT,
+ database: Int = DEFAULT_DATABASE,
+ password: String = "") extends DataSink {
+
+ private val LOG = LogUtil.getLogger(getClass)
+ @transient private lazy val client = new Jedis(host, port, timeout)
+
+ override def open(context: TaskContext): Unit = {
+ client.select(database)
+
+ if (password != null && password.length != 0) {
+ client.auth(password)
+ }
+ }
+
+ override def write(message: Message): Unit = {
+
+ message.msg match {
+ // GEO
+ case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)
+
+ // Hashes
+ case msg: HDEL => client.hdel(msg.key, msg.field)
+ case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment)
+ case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment)
+ case msg: HSET => client.hset(msg.key, msg.field, msg.value)
+ case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value)
+
+ // HyperLogLog
+ case msg: PFADD => client.pfadd(msg.key, msg.element)
+
+ // Lists
+ case msg: LPUSH => client.lpush(msg.key, msg.value)
+ case msg: LPUSHX => client.lpushx(msg.key, msg.value)
+ case msg: LSET => client.lset(msg.key, msg.index, msg.value)
+ case msg: RPUSH => client.rpush(msg.key, msg.value)
+ case msg: RPUSHX => client.rpushx(msg.key, msg.value)
+
+ // Keys
+ case msg: DEL => client.del(msg.message)
+ case msg: EXPIRE => client.expire(msg.key, msg.seconds)
+ case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp)
+ case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout)
+ case msg: MOVE => client.move(msg.key, msg.db)
+ case msg: PERSIST => client.persist(msg.key)
+ case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds)
+ case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp)
+ case msg: RENAME => client.rename(msg.key, msg.newKey)
+ case msg: RENAMENX => client.renamenx(msg.key, msg.newKey)
+
+ // Sets
+ case msg: SADD => client.sadd(msg.key, msg.members)
+ case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member)
+ case msg: SREM => client.srem(msg.key, msg.member)
+
+ // String
+ case msg: APPEND => client.append(msg.key, msg.value)
+ case msg: DECR => client.decr(msg.key)
+ case msg: DECRBY => client.decrBy(msg.key, msg.decrement)
+ case msg: INCR => client.incr(msg.key)
+ case msg: INCRBY => client.incrBy(msg.key, msg.increment)
+ case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment)
+ case msg: SET => client.set(msg.key, msg.value)
+ case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value)
+ case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value)
+ case msg: SETNX => client.setnx(msg.key, msg.value)
+ case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value)
+ }
+ }
+
+ override def close(): Unit = {
+ client.close()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 0b1628e..f1e0443 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -154,12 +154,6 @@ object Build extends sbt.Build {
dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion
)
- val streamingDependencies = Seq(
- unmanagedJars in Compile ++= Seq(
- getShadedJarFile("gs-collections", version.value)
- )
- )
-
val coreDependencies = Seq(
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % slf4jVersion,
@@ -199,9 +193,9 @@ object Build extends sbt.Build {
),
unmanagedJars in Compile ++= Seq(
- getShadedJarFile("metrics-graphite", version.value),
- getShadedJarFile("guava", version.value),
- getShadedJarFile("akka-kryo", version.value)
+ getShadedJarFile(shaded_metrics_graphite.id, version.value),
+ getShadedJarFile(shaded_guava.id, version.value),
+ getShadedJarFile(shaded_akka_kryo.id, version.value)
)
)
@@ -250,6 +244,20 @@ object Build extends sbt.Build {
.map(_.filterNot(_.getCanonicalPath.contains("akka")))
}
+ private def addShadedDeps(deps: Seq[xml.Node], node: xml.Node): xml.Node = {
+ node match {
+ case elem: xml.Elem =>
+ val child = if (elem.label == "dependencies") {
+ elem.child ++ deps
+ } else {
+ elem.child.map(addShadedDeps(deps, _))
+ }
+ xml.Elem(elem.prefix, elem.label, elem.attributes, elem.scope, false, child: _*)
+ case _ =>
+ node
+ }
+ }
+
lazy val root = Project(
id = "gearpump",
base = file("."),
@@ -262,7 +270,14 @@ object Build extends sbt.Build {
lazy val core = Project(
id = "gearpump-core",
base = file("core"),
- settings = commonSettings ++ javadocSettings ++ coreDependencies)
+ settings = commonSettings ++ javadocSettings ++ coreDependencies ++ Seq(
+ pomPostProcess := {
+ (node: xml.Node) => addShadedDeps(List(
+ getShadedDepXML(organization.value, shaded_akka_kryo.id, version.value),
+ getShadedDepXML(organization.value, shaded_guava.id, version.value),
+ getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node)
+ }
+ ))
.disablePlugins(sbtassembly.AssemblyPlugin)
lazy val daemon = Project(
@@ -282,9 +297,18 @@ object Build extends sbt.Build {
lazy val streaming = Project(
id = "gearpump-streaming",
base = file("streaming"),
- settings = commonSettings ++ javadocSettings ++ streamingDependencies)
- .dependsOn(core % "test->test; compile->compile", daemon % "test->test")
- .disablePlugins(sbtassembly.AssemblyPlugin)
+ settings = commonSettings ++ javadocSettings ++ Seq(
+ unmanagedJars in Compile ++= Seq(
+ getShadedJarFile(shaded_gs_collections.id, version.value)
+ ),
+
+ pomPostProcess := {
+ (node: xml.Node) => addShadedDeps(List(
+ getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node)
+ }
+ ))
+ .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test")
+ .disablePlugins(sbtassembly.AssemblyPlugin)
lazy val external_kafka = Project(
id = "gearpump-external-kafka",
@@ -402,6 +426,18 @@ object Build extends sbt.Build {
.dependsOn (services % "test->test; compile->compile", daemon % "test->test; compile->compile")
.disablePlugins(sbtassembly.AssemblyPlugin)
+ lazy val redis = Project(
+ id = "gearpump-experiments-redis",
+ base = file("experiments/redis"),
+ settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "redis.clients" % "jedis" % "2.9.0"
+ ),
+ mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
+ ))
+ .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+
lazy val storm = Project(
id = "gearpump-experiments-storm",
base = file("experiments/storm"),
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/BuildShaded.scala
----------------------------------------------------------------------
diff --git a/project/BuildShaded.scala b/project/BuildShaded.scala
index 1f59bfd..a43587c 100644
--- a/project/BuildShaded.scala
+++ b/project/BuildShaded.scala
@@ -35,7 +35,7 @@ object BuildShaded extends sbt.Build {
_.copy(includeScala = false)
},
assemblyJarName in assembly := {
- s"${name.value}-$scalaVersionMajor-${version.value}-assembly.jar"
+ s"${name.value}_$scalaVersionMajor-${version.value}.jar"
},
target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor
)
@@ -44,92 +44,99 @@ object BuildShaded extends sbt.Build {
id = "gearpump-shaded",
base = file("shaded")
).aggregate(shaded_akka_kryo, shaded_gs_collections, shaded_guava, shaded_metrics_graphite)
- .disablePlugins(sbtassembly.AssemblyPlugin)
-
+ .disablePlugins(sbtassembly.AssemblyPlugin)
lazy val shaded_akka_kryo = Project(
id = "gearpump-shaded-akka-kryo",
base = file("shaded/akka-kryo"),
- settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo",
- "assembly"), sbtassembly.AssemblyKeys.assembly) ++
- Seq(
- assemblyShadeRules in assembly := Seq(
- ShadeRule.zap("com.google.protobuf.**").inAll,
- ShadeRule.zap("com.typesafe.config.**").inAll,
- ShadeRule.zap("akka.**").inAll,
- ShadeRule.zap("org.jboss.netty.**").inAll,
- ShadeRule.zap("net.jpountz.lz4.**").inAll,
- ShadeRule.zap("org.uncommons.maths.**").inAll,
- ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll,
- ShadeRule.rename("com.esotericsoftware.**" ->
- "org.apache.gearpump.esotericsoftware.@1").inAll,
- ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll
- )
- ) ++
- Seq(
- libraryDependencies ++= Seq(
- "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion
- )
+ settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo"),
+ sbtassembly.AssemblyKeys.assembly) ++
+ Seq(
+ assemblyShadeRules in assembly := Seq(
+ ShadeRule.zap("com.google.protobuf.**").inAll,
+ ShadeRule.zap("com.typesafe.config.**").inAll,
+ ShadeRule.zap("akka.**").inAll,
+ ShadeRule.zap("org.jboss.netty.**").inAll,
+ ShadeRule.zap("net.jpountz.lz4.**").inAll,
+ ShadeRule.zap("org.uncommons.maths.**").inAll,
+ ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll,
+ ShadeRule.rename("com.esotericsoftware.**" ->
+ "org.apache.gearpump.esotericsoftware.@1").inAll,
+ ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll
+ )
+ ) ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion
)
+ )
)
lazy val shaded_gs_collections = Project(
id = "gearpump-shaded-gs-collections",
base = file("shaded/gs-collections"),
- settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections",
- "assembly"), sbtassembly.AssemblyKeys.assembly) ++
- Seq(
- assemblyShadeRules in assembly := Seq(
- ShadeRule.rename("com.gs.collections.**" ->
- "org.apache.gearpump.gs.collections.@1").inAll
- )
- ) ++
- Seq(
- libraryDependencies ++= Seq(
- "com.goldmansachs" % "gs-collections" % gsCollectionsVersion
- )
+ settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections"),
+ sbtassembly.AssemblyKeys.assembly) ++
+ Seq(
+ assemblyShadeRules in assembly := Seq(
+ ShadeRule.rename("com.gs.collections.**" ->
+ "org.apache.gearpump.gs.collections.@1").inAll
)
+ ) ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "com.goldmansachs" % "gs-collections" % gsCollectionsVersion
+ )
+ )
)
lazy val shaded_guava = Project(
id = "gearpump-shaded-guava",
base = file("shaded/guava"),
- settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava",
- "assembly"), sbtassembly.AssemblyKeys.assembly) ++
- Seq(
- assemblyShadeRules in assembly := Seq(
- ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll
- )
- ) ++
- Seq(
- libraryDependencies ++= Seq(
- "com.google.guava" % "guava" % guavaVersion
- )
+ settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava"),
+ sbtassembly.AssemblyKeys.assembly) ++
+ Seq(
+ assemblyShadeRules in assembly := Seq(
+ ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll
+ )
+ ) ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "com.google.guava" % "guava" % guavaVersion
)
+ )
)
lazy val shaded_metrics_graphite = Project(
id = "gearpump-shaded-metrics-graphite",
base = file("shaded/metrics-graphite"),
- settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite",
- "assembly"), sbtassembly.AssemblyKeys.assembly) ++
- Seq(
- assemblyShadeRules in assembly := Seq(
- ShadeRule.rename("com.codahale.metrics.**" ->
- "org.apache.gearpump.codahale.metrics.@1").inAll
- )
- ) ++
- Seq(
- libraryDependencies ++= Seq(
- "com.codahale.metrics" % "metrics-graphite" % codahaleVersion,
- "com.codahale.metrics" % "metrics-jvm" % codahaleVersion
- )
+ settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite"),
+ sbtassembly.AssemblyKeys.assembly) ++
+ Seq(
+ assemblyShadeRules in assembly := Seq(
+ ShadeRule.rename("com.codahale.metrics.**" ->
+ "org.apache.gearpump.codahale.metrics.@1").inAll
)
+ ) ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "com.codahale.metrics" % "metrics-graphite" % codahaleVersion,
+ "com.codahale.metrics" % "metrics-jvm" % codahaleVersion
+ )
+ )
)
def getShadedJarFile(name: String, gearpumpVersion: String): File = {
shaded.base / "target" / scalaVersionMajor /
- s"gearpump-shaded-$name-$scalaVersionMajor-$gearpumpVersion-assembly.jar"
+ s"${name}_$scalaVersionMajor-$gearpumpVersion.jar"
+ }
+
+ def getShadedDepXML(groupId: String, artifactId: String, version: String): scala.xml.Node = {
+ <dependency>
+ <groupId>{groupId}</groupId>
+ <artifactId>{artifactId}</artifactId>
+ <version>{version}</version>
+ </dependency>
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index cd33b50..f99a436 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -22,6 +22,7 @@ object Constants {
val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
+ val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function"
val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 66ec873..a6588a1 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
*/
class StreamApplication(
override val name: String, val inputUserConfig: UserConfig,
- val dag: Graph[ProcessorDescription, PartitionerDescription])
+ dag: Graph[ProcessorDescription, PartitionerDescription])
extends Application {
require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
index 786d496..440a45e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
@@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl._
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
@@ -35,12 +38,12 @@ class Stream[T](
/**
* converts a value[T] to a list of value[R]
*
- * @param fun FlatMap function
+ * @param fn FlatMap function
* @param description The description message for this operation
* @return A new stream with type [R]
*/
- def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = {
- val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
+ def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+ val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
graph.addVertex(flatMapOp)
graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
new Stream[R](graph, flatMapOp)
@@ -49,36 +52,36 @@ class Stream[T](
/**
* Maps message of type T message of type R
*
- * @param fun Function
+ * @param fn Function
* @return A new stream with type [R]
*/
- def map[R](fun: T => R, description: String = null): Stream[R] = {
+ def map[R](fn: T => R, description: String = "map"): Stream[R] = {
this.flatMap({ data =>
- Option(fun(data))
- }, Option(description).getOrElse("map"))
+ Option(fn(data))
+ }, description)
}
/**
* Keeps records when fun(T) == true
*
- * @param fun the filter
+ * @param fn the filter
* @return a new stream after filter
*/
- def filter(fun: T => Boolean, description: String = null): Stream[T] = {
+ def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
this.flatMap({ data =>
- if (fun(data)) Option(data) else None
- }, Option(description).getOrElse("filter"))
+ if (fn(data)) Option(data) else None
+ }, description)
}
/**
* Reduces operations.
*
- * @param fun reduction function
+ * @param fn reduction function
* @param description description message for this operator
* @return a new stream after reduction
*/
- def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
- val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
+ def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+ val reduceOp = ChainableOp(new ReduceFunction(fn, description))
graph.addVertex(reduceOp)
graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
new Stream(graph, reduceOp)
@@ -88,7 +91,10 @@ class Stream[T](
* Log to task log file
*/
def log(): Unit = {
- this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
+ this.map(msg => {
+ LoggerFactory.getLogger("dsl").info(msg.toString)
+ msg
+ }, "log")
}
/**
@@ -97,8 +103,8 @@ class Stream[T](
* @param other the other stream
* @return the merged stream
*/
- def merge(other: Stream[T], description: String = null): Stream[T] = {
- val mergeOp = MergeOp(Option(description).getOrElse("merge"))
+ def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+ val mergeOp = MergeOp(description, UserConfig.empty)
graph.addVertex(mergeOp)
graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
@@ -115,20 +121,29 @@ class Stream[T](
*
* For example,
* {{{
- * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+ * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
* }}}
*
- * @param fun Group by function
+ * @param fn Group by function
* @param parallelism Parallelism level
* @param description The description
* @return the grouped stream
*/
- def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
- : Stream[T] = {
- val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
- graph.addVertex(groupOp)
- graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
- new Stream[T](graph, groupOp)
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ window(CountWindow.apply(1).accumulating)
+ .groupBy[GROUP](fn, parallelism, description)
+ }
+
+ /**
+ * Window function
+ *
+ * @param win window definition
+ * @param description window description
+ * @return [[WindowStream]] where groupBy could be applied
+ */
+ def window(win: Window, description: String = "window"): WindowStream[T] = {
+ new WindowStream[T](graph, edge, thisNode, win, description)
}
/**
@@ -140,15 +155,28 @@ class Stream[T](
*/
def process[R](
processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
- description: String = null): Stream[R] = {
- val processorOp = ProcessorOp(processor, parallelism, conf,
- Option(description).getOrElse("process"))
+ description: String = "process"): Stream[R] = {
+ val processorOp = ProcessorOp(processor, parallelism, conf, description)
graph.addVertex(processorOp)
graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
new Stream[R](graph, processorOp, Some(Shuffle))
}
}
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
+ window: Window, winDesc: String) {
+
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
+ val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+ s"$winDesc.$description")
+ graph.addVertex(groupOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+ new Stream[T](graph, groupOp)
+ }
+}
+
class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
/**
* GroupBy key
@@ -192,30 +220,18 @@ object Stream {
}
implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
- def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String)
- : Stream[T] = {
- implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
- Some(description).getOrElse("traversable"))
+ def sink(dataSink: DataSink, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+ implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
stream.graph.addVertex(sink)
stream.graph.addEdge(stream.thisNode, Shuffle, sink)
new Stream[T](stream.graph, sink)
}
-
- def sink[T](
- sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty,
- description: String = null): Stream[T] = {
- val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source"))
- stream.graph.addVertex(sinkOp)
- stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
- new Stream[T](stream.graph, sinkOp)
- }
}
}
class LoggerSink[T] extends DataSink {
- var logger: Logger = null
-
- private var context: TaskContext = null
+ var logger: Logger = _
override def open(context: TaskContext): Unit = {
this.logger = context.logger
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index d45737b..8116146 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -24,10 +24,9 @@ import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
-import org.apache.gearpump.streaming.dsl.plan.Planner
+import org.apache.gearpump.streaming.dsl.plan._
import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.Graph
import org.apache.gearpump.Message
@@ -50,7 +49,8 @@ import scala.language.implicitConversions
* @param name name of app
*/
class StreamApp(
- val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) {
+ name: String, system: ActorSystem, userConfig: UserConfig,
+ private val graph: Graph[Op, OpEdge]) {
def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
this(name, system, userConfig, Graph.empty[Op, OpEdge])
@@ -76,34 +76,16 @@ object StreamApp {
implicit class Source(app: StreamApp) extends java.io.Serializable {
- def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
- source(dataSource, parallelism, UserConfig.empty)
- }
-
- def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = {
- source(dataSource, parallelism, UserConfig.empty, description)
- }
-
- def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = {
- source(dataSource, parallelism, conf, description = null)
- }
-
- def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
- : Stream[T] = {
+ def source[T](dataSource: DataSource, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
app.graph.addVertex(sourceOp)
new Stream[T](app.graph, sourceOp)
}
+
def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
}
-
- def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
- : Stream[T] = {
- val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source"))
- app.graph.addVertex(sourceOp)
- new Stream[T](app.graph, sourceOp)
- }
}
}
@@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
override def read(): Message = {
if (iterator.hasNext) {
- Message(iterator.next())
+ Message(iterator.next(), Instant.now().toEpochMilli)
} else {
null
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 6eff20c..3003b98 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -19,9 +19,9 @@
package org.apache.gearpump.streaming.dsl.javaapi
import scala.collection.JavaConverters._
-
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.dsl.window.api.Window
+import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
import org.apache.gearpump.streaming.javaapi.dsl.functions._
import org.apache.gearpump.streaming.task.Task
@@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) {
* Group by a stream and turns it to a list of sub-streams. Operations chained after
* groupBy applies to sub-streams.
*/
- def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String)
- : JavaStream[T] = {
- new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+ def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
+ parallelism: Int, description: String): JavaStream[T] = {
+ new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+ }
+
+ def window(win: Window, description: String): JavaWindowStream[T] = {
+ new JavaWindowStream[T](stream.window(win, description))
}
/** Add a low level Processor to process messages */
@@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) {
new JavaStream[R](stream.process(processor, parallelism, conf, description))
}
}
+
+class JavaWindowStream[T](stream: WindowStream[T]) {
+
+ def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
+ description: String): JavaStream[T] = {
+ new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
deleted file mode 100644
index 49d9dec..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.op
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * Operators for the DSL
- */
-sealed trait Op {
- def description: String
- def conf: UserConfig
-}
-
-/**
- * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP
- * "Attach" means running in same Actor.
- */
-trait SlaveOp[T] extends Op
-
-case class FlatMapOp[T, R](
- fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty)
- extends SlaveOp[T]
-
-case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty)
- extends SlaveOp[T]
-
-trait MasterOp extends Op
-
-trait ParameterizedOp[T] extends MasterOp
-
-case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty)
- extends MasterOp
-
-case class GroupByOp[T, R](
- fun: T => R, parallelism: Int, description: String,
- override val conf: UserConfig = UserConfig.empty)
- extends ParameterizedOp[T]
-
-case class ProcessorOp[T <: Task](
- processor: Class[T], parallelism: Int, conf: UserConfig, description: String)
- extends ParameterizedOp[T]
-
-case class DataSourceOp[T](
- dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
- extends ParameterizedOp[T]
-
-case class DataSinkOp[T](
- dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String)
- extends ParameterizedOp[T]
-
-/**
- * Contains operators which can be chained to single one.
- *
- * For example, flatmap().map().reduce() can be chained to single operator as
- * no data shuffling is required.
- * @param ops list of operations
- */
-case class OpChain(ops: List[Op]) extends Op {
- def head: Op = ops.head
- def last: Op = ops.last
-
- def description: String = null
-
- override def conf: UserConfig = {
- // The head's conf has priority
- ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
- conf.withConfig(op.conf)
- }
- }
-}
-
-trait OpEdge
-
-/**
- * The upstream OP and downstream OP doesn't require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Direct extends OpEdge
-
-/**
- * The upstream OP and downstream OP DOES require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Shuffle extends OpEdge
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
new file mode 100644
index 0000000..2ec881b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ * val groupBy: (People => String) = people => people.gender
+ * val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param fn First apply message with groupBy function, then pick the hashCode of the output
+ * to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group])
+ extends UnicastPartitioner {
+ override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ val hashCode = fn.groupBy(message).hashCode()
+ (hashCode & Integer.MAX_VALUE) % partitionNum
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
deleted file mode 100644
index b2e2932..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- * val groupBy: (People => String) = people => people.gender
- * val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param groupBy First apply message with groupBy function, then pick the hashCode of the output
- * to do the partitioning. You must define hashCode() for output type of groupBy function.
- */
-class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner {
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
- (hashCode & Integer.MAX_VALUE) % partitionNum
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
new file mode 100644
index 0000000..744976b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.{Constants, Processor}
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
+import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
+import org.apache.gearpump.streaming.task.Task
+
+import scala.reflect.ClassTag
+
+/**
+ * This is a vertex on the logical plan.
+ */
+sealed trait Op {
+
+ def description: String
+
+ def userConfig: UserConfig
+
+ def chain(op: Op)(implicit system: ActorSystem): Op
+
+ def getProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+/**
+ * This represents a low level Processor.
+ */
+case class ProcessorOp[T <: Task](
+ processor: Class[T],
+ parallelism: Int,
+ userConfig: UserConfig,
+ description: String)
+ extends Op {
+
+ def this(
+ parallelism: Int = 1,
+ userConfig: UserConfig = UserConfig.empty,
+ description: String = "processor")(implicit classTag: ClassTag[T]) = {
+ this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description)
+ }
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ throw new OpChainException(this, other)
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ DefaultProcessor(parallelism, description, userConfig, processor)
+ }
+}
+
+/**
+ * This represents a DataSource.
+ */
+case class DataSourceOp(
+ dataSource: DataSource,
+ parallelism: Int = 1,
+ userConfig: UserConfig = UserConfig.empty,
+ description: String = "source")
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[_, _] =>
+ DataSourceOp(dataSource, parallelism,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn),
+ description)
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Processor[DataSourceTask[Any, Any]](parallelism, description,
+ userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+ }
+}
+
+/**
+ * This represents a DataSink.
+ */
+case class DataSinkOp(
+ dataSink: DataSink,
+ parallelism: Int = 1,
+ userConfig: UserConfig = UserConfig.empty,
+ description: String = "sink")
+ extends Op {
+
+ override def chain(op: Op)(implicit system: ActorSystem): Op = {
+ throw new OpChainException(this, op)
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ DataSinkProcessor(dataSink, parallelism, description)
+ }
+}
+
+/**
+ * This represents operations that can be chained together
+ * (e.g. flatMap, map, filter, reduce) and further chained
+ * to another Op to be used
+ */
+case class ChainableOp[IN, OUT](
+ fn: SingleInputFunction[IN, OUT]) extends Op {
+
+ override def description: String = fn.description
+
+ override def userConfig: UserConfig = UserConfig.empty
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[OUT, _] =>
+ // TODO: preserve type info
+ ChainableOp(fn.andThen(op.fn))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+ }
+}
+
+/**
+ * This represents a Processor with window aggregation
+ */
+case class GroupByOp[IN, GROUP](
+ groupByFn: GroupByFn[IN, GROUP],
+ parallelism: Int = 1,
+ description: String = "groupBy",
+ override val userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[_, _] =>
+ GroupByOp(groupByFn, parallelism, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ groupByFn.getProcessor(parallelism, description, userConfig)
+ }
+}
+
+/**
+ * This represents a Processor transforming merged streams
+ */
+case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty)
+ extends Op {
+
+ override def chain(other: Op)(implicit system: ActorSystem): Op = {
+ other match {
+ case op: ChainableOp[_, _] =>
+ MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+ case _ =>
+ throw new OpChainException(this, other)
+ }
+ }
+
+ override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+ Processor[TransformTask[Any, Any]](1, description, userConfig)
+ }
+
+}
+
+/**
+ * This is an edge on the logical plan.
+ */
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ * e.g. ChainableOp
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ * e.g. GroupByOp
+ */
+case object Shuffle extends OpEdge
+
+/**
+ * Runtime exception thrown on chaining.
+ */
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
deleted file mode 100644
index b09d9b9..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import scala.collection.TraversableOnce
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-import org.apache.gearpump._
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.op._
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.sink.DataSinkProcessor
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Translates a OP to a TaskDescription
- */
-class OpTranslator extends java.io.Serializable {
- val LOG: Logger = LogUtil.getLogger(getClass)
-
- def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = {
-
- val baseConfig = ops.conf
-
- ops.ops.head match {
- case op: MasterOp =>
- val tail = ops.ops.tail
- val func = toFunction(tail)
- val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
- op match {
- case DataSourceOp(dataSource, parallelism, conf, description) =>
- Processor[DataSourceTask[Any, Any]](parallelism,
- description = description + "." + func.description,
- userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
- case groupby@GroupByOp(_, parallelism, description, _) =>
- Processor[GroupByTask[Object, Object, Object]](parallelism,
- description = description + "." + func.description,
- userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby))
- case merge: MergeOp =>
- Processor[TransformTask[Object, Object]](1,
- description = op.description + "." + func.description,
- userConfig)
- case ProcessorOp(processor, parallelism, conf, description) =>
- DefaultProcessor(parallelism,
- description = description + " " + func.description,
- userConfig, processor)
- case DataSinkOp(dataSink, parallelism, conf, description) =>
- DataSinkProcessor(dataSink, parallelism, description + func.description)
- }
- case op: SlaveOp[_] =>
- val func = toFunction(ops.ops)
- val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
- Processor[TransformTask[Object, Object]](1,
- description = func.description,
- taskConf = userConfig)
- case chain: OpChain =>
- throw new RuntimeException("Not supposed to be called!")
- }
- }
-
- private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = {
- val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]()
- val totalFunction = ops.foldLeft(func) { (fun, op) =>
-
- val opFunction = op match {
- case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
- new FlatMapFunction(flatmap.fun, flatmap.description)
- case reduce: ReduceOp[Object @unchecked] =>
- new ReduceFunction(reduce.fun, reduce.description)
- case _ =>
- throw new RuntimeException("Not supposed to be called!")
- }
- fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
- }
- totalFunction.asInstanceOf[SingleInputFunction[Object, Object]]
- }
-}
-
-object OpTranslator {
-
- trait SingleInputFunction[IN, OUT] extends Serializable {
- def process(value: IN): TraversableOnce[OUT]
- def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- new AndThen(this, other)
- }
-
- def description: String
- }
-
- class DummyInputFunction[T] extends SingleInputFunction[T, T] {
- override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
- : SingleInputFunction[T, OUTER] = {
- other
- }
-
- // Should never be called
- override def process(value: T): TraversableOnce[T] = None
-
- override def description: String = ""
- }
-
- class AndThen[IN, MIDDLE, OUT](
- first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
- extends SingleInputFunction[IN, OUT] {
-
- override def process(value: IN): TraversableOnce[OUT] = {
- first.process(value).flatMap(second.process)
- }
-
- override def description: String = {
- Option(first.description).flatMap { description =>
- Option(second.description).map(description + "." + _)
- }.orNull
- }
- }
-
- class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String)
- extends SingleInputFunction[IN, OUT] {
-
- override def process(value: IN): TraversableOnce[OUT] = {
- fun(value)
- }
-
- override def description: String = {
- this.descriptionMessage
- }
- }
-
- class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
- extends SingleInputFunction[T, T] {
-
- private var state: Any = _
-
- override def process(value: T): TraversableOnce[T] = {
- if (state == null) {
- state = value
- } else {
- state = fun(state.asInstanceOf[T], value)
- }
- Some(state.asInstanceOf[T])
- }
-
- override def description: String = descriptionMessage
- }
-
- class GroupByTask[IN, GROUP, OUT](
- groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
- extends Task(taskContext, userConf) {
-
- def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(userConf.getValue[GroupByOp[IN, GROUP]](
- GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
- taskContext, userConf)
- }
-
- private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
-
- override def onNext(msg: Message): Unit = {
- val time = msg.timestamp
-
- val group = groupBy(msg.msg.asInstanceOf[IN])
- if (!groups.contains(group)) {
- val operator =
- userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
- groups += group -> operator
- }
-
- val operator = groups(group)
-
- operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
- taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
- }
- }
- }
-
- class TransformTask[IN, OUT](
- operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
- userConf: UserConfig) extends Task(taskContext, userConf) {
-
- def this(taskContext: TaskContext, userConf: UserConfig) = {
- this(userConf.getValue[SingleInputFunction[IN, OUT]](
- GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
- }
-
- override def onNext(msg: Message): Unit = {
- val time = msg.timestamp
-
- operator match {
- case Some(op) =>
- op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
- taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
- }
- case None =>
- taskContext.output(new Message(msg.msg, time))
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index f5bbd65..16d5c06 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem
import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.dsl.op._
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.task.Task
import org.apache.gearpump.util.Graph
@@ -33,64 +32,60 @@ class Planner {
* Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
* level Graph API.
*/
- def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
- : Graph[Processor[_ <: Task], _ <: Partitioner] = {
+ def plan(dag: Graph[Op, OpEdge])
+ (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
- val opTranslator = new OpTranslator()
-
- val newDag = optimize(dag)
- newDag.mapEdge { (node1, edge, node2) =>
+ val graph = optimize(dag)
+ graph.mapEdge { (node1, edge, node2) =>
edge match {
case Shuffle =>
- node2.head match {
- case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
- new GroupByPartitioner(groupBy.fun)
+ node2 match {
+ case groupBy: GroupByOp[_, _] =>
+ new GroupByPartitioner(groupBy.groupByFn)
case _ => new HashPartitioner
}
case Direct =>
new CoLocationPartitioner
}
- }.mapVertex { opChain =>
- opTranslator.translate(opChain)
- }
+ }.mapVertex(_.getProcessor)
}
- private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
- val newGraph = dag.mapVertex(op => OpChain(List(op)))
-
- val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
+ private def optimize(dag: Graph[Op, OpEdge])
+ (implicit system: ActorSystem): Graph[Op, OpEdge] = {
+ val graph = dag.copy
+ val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
for (node <- nodes) {
- val outGoingEdges = newGraph.outgoingEdgesOf(node)
+ val outGoingEdges = graph.outgoingEdgesOf(node)
for (edge <- outGoingEdges) {
- merge(newGraph, edge._1, edge._3)
+ merge(graph, edge._1, edge._3)
}
}
- newGraph
+ graph
}
- private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain)
- : Graph[OpChain, OpEdge] = {
- if (dag.outDegreeOf(node1) == 1 &&
- dag.inDegreeOf(node2) == 1 &&
+ private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op)
+ (implicit system: ActorSystem): Unit = {
+ if (graph.outDegreeOf(node1) == 1 &&
+ graph.inDegreeOf(node2) == 1 &&
// For processor node, we don't allow it to merge with downstream operators
- !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
- val (_, edge, _) = dag.outgoingEdgesOf(node1).head
+ !node1.isInstanceOf[ProcessorOp[_ <: Task]] &&
+ !node2.isInstanceOf[ProcessorOp[_ <: Task]]) {
+ val (_, edge, _) = graph.outgoingEdgesOf(node1).head
if (edge == Direct) {
- val opList = OpChain(node1.ops ++ node2.ops)
- dag.addVertex(opList)
- for (incomingEdge <- dag.incomingEdgesOf(node1)) {
- dag.addEdge(incomingEdge._1, incomingEdge._2, opList)
+ val chainedOp = node1.chain(node2)
+ graph.addVertex(chainedOp)
+ for (incomingEdge <- graph.incomingEdgesOf(node1)) {
+ graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp)
}
- for (outgoingEdge <- dag.outgoingEdgesOf(node2)) {
- dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
+ for (outgoingEdge <- graph.outgoingEdgesOf(node2)) {
+ graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3)
}
// Remove the old vertex
- dag.removeVertex(node1)
- dag.removeVertex(node2)
+ graph.removeVertex(node1)
+ graph.removeVertex(node2)
}
}
- dag
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
new file mode 100644
index 0000000..609fbb0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+trait SingleInputFunction[IN, OUT] extends Serializable {
+ def process(value: IN): TraversableOnce[OUT]
+ def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
+ new AndThen(this, other)
+ }
+ def finish(): TraversableOnce[OUT] = None
+ def clearState(): Unit = {}
+ def description: String
+}
+
+class AndThen[IN, MIDDLE, OUT](
+ first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+ extends SingleInputFunction[IN, OUT] {
+
+ override def process(value: IN): TraversableOnce[OUT] = {
+ first.process(value).flatMap(second.process)
+ }
+
+ override def finish(): TraversableOnce[OUT] = {
+ val firstResult = first.finish().flatMap(second.process)
+ if (firstResult.isEmpty) {
+ second.finish()
+ } else {
+ firstResult
+ }
+ }
+
+ override def clearState(): Unit = {
+ first.clearState()
+ second.clearState()
+ }
+
+ override def description: String = {
+ Option(first.description).flatMap { description =>
+ Option(second.description).map(description + "." + _)
+ }.orNull
+ }
+}
+
+class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+ extends SingleInputFunction[IN, OUT] {
+
+ override def process(value: IN): TraversableOnce[OUT] = {
+ fn(value)
+ }
+
+ override def description: String = descriptionMessage
+}
+
+
+class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+ extends SingleInputFunction[T, T] {
+
+ private var state: Option[T] = None
+
+ override def process(value: T): TraversableOnce[T] = {
+ if (state.isEmpty) {
+ state = Option(value)
+ } else {
+ state = state.map(fn(_, value))
+ }
+ None
+ }
+
+ override def finish(): TraversableOnce[T] = {
+ state
+ }
+
+ override def clearState(): Unit = {
+ state = None
+ }
+
+ override def description: String = descriptionMessage
+}
+
+class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+
+ override def process(value: T): TraversableOnce[Unit] = {
+ emit(value)
+ None
+ }
+
+ override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
+ throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
+ }
+
+ override def description: String = ""
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
new file mode 100644
index 0000000..4ee2fa8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on number of messages in a window.
+ */
+class CountTriggerTask[IN, GROUP](
+ groupBy: GroupAlsoByWindow[IN, GROUP],
+ windowRunner: WindowRunner,
+ taskContext: TaskContext,
+ userConfig: UserConfig)
+ extends Task(taskContext, userConfig) {
+
+ def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+ taskContext: TaskContext, userConfig: UserConfig) = {
+ this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+ taskContext, userConfig)
+ }
+
+ def this(taskContext: TaskContext, userConfig: UserConfig) = {
+ this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+ GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+ taskContext, userConfig)
+ }
+
+ private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size
+ private var num = 0
+
+ override def onNext(msg: Message): Unit = {
+ windowRunner.process(msg)
+ num += 1
+ if (windowSize == num) {
+ windowRunner.trigger(Instant.ofEpochMilli(windowSize))
+ num = 0
+ }
+ }
+}