You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/01/13 09:13:08 UTC
[2/2] incubator-gearpump git commit: [GEARPUMP-262] Add setup and
teardown to user defined functions
[GEARPUMP-262] Add setup and teardown to user defined functions
Author: manuzhang <ow...@gmail.com>
Closes #131 from manuzhang/setup_teardown.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a23a40f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a23a40f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a23a40f5
Branch: refs/heads/master
Commit: a23a40f5e558a1a5f10503f80204d9c6e690e0bd
Parents: 385a612
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jan 13 17:11:54 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Jan 13 17:12:38 2017 +0800
----------------------------------------------------------------------
.../examples/kafka/dsl/KafkaReadWrite.scala | 4 +-
.../examples/wordcountjava/dsl/WordCount.java | 47 ++-
.../wordcount/dsl/WindowedWordCount.scala | 2 +-
.../examples/wordcount/dsl/WordCount.scala | 4 +-
.../external/hbase/dsl/HBaseDSLSink.scala | 5 +-
.../gearpump/streaming/kafka/dsl/KafkaDSL.scala | 12 +-
.../javaapi/dsl/functions/FilterFunction.java | 30 --
.../javaapi/dsl/functions/FlatMapFunction.java | 32 ---
.../javaapi/dsl/functions/GroupByFunction.java | 31 --
.../javaapi/dsl/functions/MapFunction.java | 31 --
.../javaapi/dsl/functions/ReduceFunction.java | 30 --
.../apache/gearpump/streaming/dsl/Stream.scala | 245 ----------------
.../gearpump/streaming/dsl/StreamApp.scala | 109 -------
.../dsl/api/functions/FilterFunction.scala | 42 +++
.../dsl/api/functions/MapFunction.scala | 43 +++
.../dsl/api/functions/ReduceFunction.scala | 42 +++
.../streaming/dsl/javaapi/JavaStream.scala | 18 +-
.../streaming/dsl/javaapi/JavaStreamApp.scala | 5 +-
.../dsl/javaapi/functions/FlatMapFunction.scala | 32 +++
.../dsl/javaapi/functions/GroupByFunction.scala | 28 ++
.../apache/gearpump/streaming/dsl/plan/OP.scala | 4 +-
.../plan/functions/SingleInputFunction.scala | 66 +++--
.../streaming/dsl/scalaapi/Stream.scala | 287 +++++++++++++++++++
.../streaming/dsl/scalaapi/StreamApp.scala | 109 +++++++
.../scalaapi/functions/FlatMapFunction.scala | 103 +++++++
.../functions/SerializableFunction.scala | 32 +++
.../streaming/dsl/task/TransformTask.scala | 5 +-
.../dsl/window/impl/WindowRunner.scala | 16 +-
.../gearpump/streaming/dsl/StreamAppSpec.scala | 72 -----
.../gearpump/streaming/dsl/StreamSpec.scala | 128 ---------
.../gearpump/streaming/dsl/plan/OpSpec.scala | 13 +-
.../streaming/dsl/plan/PlannerSpec.scala | 15 +-
.../functions/SingleInputFunctionSpec.scala | 202 ++++++-------
.../streaming/dsl/scalaapi/StreamAppSpec.scala | 73 +++++
.../streaming/dsl/scalaapi/StreamSpec.scala | 129 +++++++++
35 files changed, 1154 insertions(+), 892 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
index 49d3619..cbfe57a 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
@@ -21,8 +21,8 @@ package org.apache.gearpump.streaming.examples.kafka.dsl
import java.util.Properties
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser}
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
import org.apache.gearpump.streaming.kafka.KafkaStoreFactory
import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL
import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index d4866ed..2942861 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -25,12 +25,17 @@ import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.cluster.client.ClientContext;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import org.apache.gearpump.streaming.source.DataSource;
import org.apache.gearpump.streaming.task.TaskContext;
import scala.Tuple2;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Iterator;
/** Java version of WordCount with high level DSL API */
public class WordCount {
@@ -46,15 +51,13 @@ public class WordCount {
JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
1, UserConfig.empty(), "source");
- JavaStream<String> words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(),
- "flatMap");
+ JavaStream<String> words = sentence.flatMap(new Split(), "flatMap");
- JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 1), "map");
+ JavaStream<Tuple2<String, Integer>> ones = words.map(new Ones(), "map");
- JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy");
+ JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), 1, "groupBy");
- JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(
- (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce");
+ JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new Count(), "reduce");
wordcount.log();
@@ -88,4 +91,36 @@ public class WordCount {
return Instant.now();
}
}
+
+ private static class Split extends FlatMapFunction<String, String> {
+
+ @Override
+ public Iterator<String> apply(String s) {
+ return Arrays.asList(s.split("\\s+")).iterator();
+ }
+ }
+
+ private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public Tuple2<String, Integer> apply(String s) {
+ return new Tuple2<>(s, 1);
+ }
+ }
+
+ private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+
+ @Override
+ public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+ return new Tuple2<>(t1._1(), t1._2() + t2._2());
+ }
+ }
+
+ private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
+
+ @Override
+ public String apply(Tuple2<String, Integer> tuple) {
+ return tuple._1();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 4f43fd4..401eac0 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -22,7 +22,7 @@ import java.time.{Duration, Instant}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.TaskContext
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 22f597c..1cbfb22 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.examples.wordcount.dsl
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.StreamApp._
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
import org.apache.gearpump.util.AkkaApp
/** Same WordCount with High level DSL syntax */
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
----------------------------------------------------------------------
diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
index 2417763..22efa89 100644
--- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -18,13 +18,10 @@
package org.apache.gearpump.external.hbase.dsl
import scala.language.implicitConversions
-
import org.apache.hadoop.conf.Configuration
-
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.external.hbase.HBaseSink
-import org.apache.gearpump.streaming.dsl.Stream
-import org.apache.gearpump.streaming.dsl.Stream.Sink
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
/** Create a HBase DSL Sink */
class HBaseDSLSink[T](stream: Stream[T]) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
index f1bb26a..996ae0b 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
@@ -21,7 +21,7 @@ import java.util.Properties
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp}
import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource}
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
@@ -44,7 +44,7 @@ object KafkaDSL {
parallelism: Int = 1,
config: UserConfig = UserConfig.empty,
description: String = "KafkaSource"
- ): dsl.Stream[T] = {
+ ): Stream[T] = {
app.source[T](new KafkaSource(topics, properties), parallelism, config, description)
}
@@ -66,19 +66,19 @@ object KafkaDSL {
properties: Properties,
parallelism: Int = 1,
config: UserConfig = UserConfig.empty,
- description: String = "KafkaSource"): dsl.Stream[T] = {
+ description: String = "KafkaSource"): Stream[T] = {
val source = new KafkaSource(topics, properties)
source.setCheckpointStore(checkpointStoreFactory)
app.source[T](source, parallelism, config, description)
}
import scala.language.implicitConversions
- implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = {
+ implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = {
new KafkaDSL[T](stream)
}
}
-class KafkaDSL[T](stream: dsl.Stream[T]) {
+class KafkaDSL[T](stream: Stream[T]) {
/**
* Sinks data to Kafka
@@ -94,7 +94,7 @@ class KafkaDSL[T](stream: dsl.Stream[T]) {
properties: Properties,
parallelism: Int = 1,
userConfig: UserConfig = UserConfig.empty,
- description: String = "KafkaSink"): dsl.Stream[T] = {
+ description: String = "KafkaSink"): Stream[T] = {
stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
deleted file mode 100644
index f07ceff..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Filter function
- *
- * @param <T> Message of type T
- */
-public interface FilterFunction<T> extends Serializable {
- boolean apply(T t);
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
deleted file mode 100644
index 9788dd2..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-/**
- * Function that converts a value of type T to a iterator of values of type R.
- *
- * @param <T> Input value type
- * @param <R> Return value type
- */
-public interface FlatMapFunction<T, R> extends Serializable {
- Iterator<R> apply(T t);
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
deleted file mode 100644
index 6c71280..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * GroupBy function which assign value of type T to groups
- *
- * @param <T> Input value type
- * @param <Group> Group Type
- */
-public interface GroupByFunction<T, Group> extends Serializable {
- Group apply(T t);
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
deleted file mode 100644
index e1fc821..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that map a value of type T to value of type R
- *
- * @param <T> Input value type
- * @param <R> Output value type
- */
-public interface MapFunction<T, R> extends Serializable {
- R apply(T t);
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
deleted file mode 100644
index 2bcac60..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that applies reduce operation
- *
- * @param <T> Input value type
- */
-public interface ReduceFunction<T> extends Serializable {
- T apply(T t1, T t2);
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
deleted file mode 100644
index 440a45e..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
-import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl._
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.Graph
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.language.implicitConversions
-
-class Stream[T](
- private val graph: Graph[Op, OpEdge], private val thisNode: Op,
- private val edge: Option[OpEdge] = None) {
-
- /**
- * converts a value[T] to a list of value[R]
- *
- * @param fn FlatMap function
- * @param description The description message for this operation
- * @return A new stream with type [R]
- */
- def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
- val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
- graph.addVertex(flatMapOp)
- graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
- new Stream[R](graph, flatMapOp)
- }
-
- /**
- * Maps message of type T message of type R
- *
- * @param fn Function
- * @return A new stream with type [R]
- */
- def map[R](fn: T => R, description: String = "map"): Stream[R] = {
- this.flatMap({ data =>
- Option(fn(data))
- }, description)
- }
-
- /**
- * Keeps records when fun(T) == true
- *
- * @param fn the filter
- * @return a new stream after filter
- */
- def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
- this.flatMap({ data =>
- if (fn(data)) Option(data) else None
- }, description)
- }
-
- /**
- * Reduces operations.
- *
- * @param fn reduction function
- * @param description description message for this operator
- * @return a new stream after reduction
- */
- def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
- val reduceOp = ChainableOp(new ReduceFunction(fn, description))
- graph.addVertex(reduceOp)
- graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
- new Stream(graph, reduceOp)
- }
-
- /**
- * Log to task log file
- */
- def log(): Unit = {
- this.map(msg => {
- LoggerFactory.getLogger("dsl").info(msg.toString)
- msg
- }, "log")
- }
-
- /**
- * Merges data from two stream into one
- *
- * @param other the other stream
- * @return the merged stream
- */
- def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
- val mergeOp = MergeOp(description, UserConfig.empty)
- graph.addVertex(mergeOp)
- graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
- graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
- new Stream[T](graph, mergeOp)
- }
-
- /**
- * Group by function (T => Group)
- *
- * For example, we have T type, People(name: String, gender: String, age: Int)
- * groupBy[People](_.gender) will group the people by gender.
- *
- * You can append other combinators after groupBy
- *
- * For example,
- * {{{
- * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
- * }}}
- *
- * @param fn Group by function
- * @param parallelism Parallelism level
- * @param description The description
- * @return the grouped stream
- */
- def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
- description: String = "groupBy"): Stream[T] = {
- window(CountWindow.apply(1).accumulating)
- .groupBy[GROUP](fn, parallelism, description)
- }
-
- /**
- * Window function
- *
- * @param win window definition
- * @param description window description
- * @return [[WindowStream]] where groupBy could be applied
- */
- def window(win: Window, description: String = "window"): WindowStream[T] = {
- new WindowStream[T](graph, edge, thisNode, win, description)
- }
-
- /**
- * Connects with a low level Processor(TaskDescription)
- *
- * @param processor a user defined processor
- * @param parallelism parallelism level
- * @return new stream after processing with type [R]
- */
- def process[R](
- processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
- description: String = "process"): Stream[R] = {
- val processorOp = ProcessorOp(processor, parallelism, conf, description)
- graph.addVertex(processorOp)
- graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
- new Stream[R](graph, processorOp, Some(Shuffle))
- }
-}
-
-class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
- window: Window, winDesc: String) {
-
- def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
- description: String = "groupBy"): Stream[T] = {
- val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
- val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
- s"$winDesc.$description")
- graph.addVertex(groupOp)
- graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
- new Stream[T](graph, groupOp)
- }
-}
-
-class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
- /**
- * GroupBy key
- *
- * Applies to Stream[Tuple2[K,V]]
- *
- * @param parallelism the parallelism for this operation
- * @return the new KV stream
- */
- def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
- stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
- }
-
- /**
- * Sum the value of the tuples
- *
- * Apply to Stream[Tuple2[K,V]], V must be of type Number
- *
- * For input (key, value1), (key, value2), will generate (key, value1 + value2)
- * @param numeric the numeric operations
- * @return the sum stream
- */
- def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
- stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
- }
-}
-
-object Stream {
-
- def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
- new Stream[T](graph, node, edge)
- }
-
- def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
-
- def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
- = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-
- implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
- new KVStream(stream)
- }
-
- implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
- def sink(dataSink: DataSink, parallelism: Int = 1,
- conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
- implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
- stream.graph.addVertex(sink)
- stream.graph.addEdge(stream.thisNode, Shuffle, sink)
- new Stream[T](stream.graph, sink)
- }
- }
-}
-
-class LoggerSink[T] extends DataSink {
- var logger: Logger = _
-
- override def open(context: TaskContext): Unit = {
- this.logger = context.logger
- }
-
- override def write(message: Message): Unit = {
- logger.info("logging message " + message.msg)
- }
-
- override def close(): Unit = Unit
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
deleted file mode 100644
index 8116146..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import java.time.Instant
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.Graph
-import org.apache.gearpump.Message
-
-import scala.language.implicitConversions
-
-/**
- * Example:
- * {{{
- * val data = "This is a good start, bingo!! bingo!!"
- * app.fromCollection(data.lines.toList).
- * // word => (word, count)
- * flatMap(line => line.split("[\\s]+")).map((_, 1)).
- * // (word, count1), (word, count2) => (word, count1 + count2)
- * groupBy(kv => kv._1).reduce(sum(_, _))
- *
- * val appId = context.submit(app)
- * context.close()
- * }}}
- *
- * @param name name of app
- */
-class StreamApp(
- name: String, system: ActorSystem, userConfig: UserConfig,
- private val graph: Graph[Op, OpEdge]) {
-
- def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
- this(name, system, userConfig, Graph.empty[Op, OpEdge])
- }
-
- def plan(): StreamApplication = {
- implicit val actorSystem = system
- val planner = new Planner
- val dag = planner.plan(graph)
- StreamApplication(name, dag, userConfig)
- }
-}
-
-object StreamApp {
- def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty)
- : StreamApp = {
- new StreamApp(name, context.system, userConfig)
- }
-
- implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
- streamApp.plan()
- }
-
- implicit class Source(app: StreamApp) extends java.io.Serializable {
-
- def source[T](dataSource: DataSource, parallelism: Int = 1,
- conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
- implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
- app.graph.addVertex(sourceOp)
- new Stream[T](app.graph, sourceOp)
- }
-
- def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
- this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
- }
- }
-}
-
-/** A test message source which generated message sequence repeatedly. */
-class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
- private lazy val iterator: Iterator[T] = seq.iterator
-
- override def open(context: TaskContext, startTime: Instant): Unit = {}
-
- override def read(): Message = {
- if (iterator.hasNext) {
- Message(iterator.next(), Instant.now().toEpochMilli)
- } else {
- null
- }
- }
-
- override def close(): Unit = {}
-
- override def getWatermark: Instant = Instant.now()
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
new file mode 100644
index 0000000..e4e7309
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+object FilterFunction {
+
+ def apply[T](fn: T => Boolean): FilterFunction[T] = {
+ new FilterFunction[T] {
+ override def apply(t: T): Boolean = {
+ fn(t)
+ }
+ }
+ }
+}
+
+/**
+ * Returns true to keep the input and false otherwise.
+ *
+ * @param T Input value type
+ */
+abstract class FilterFunction[T] extends SerializableFunction {
+
+ def apply(t: T): Boolean
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
new file mode 100644
index 0000000..70fe9d4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+object MapFunction {
+
+ def apply[T, R](fn: T => R): MapFunction[T, R] = {
+ new MapFunction[T, R] {
+ override def apply(t: T): R = {
+ fn(t)
+ }
+ }
+ }
+}
+
+/**
+ * Transforms an input into an output of possibly different types.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class MapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): R
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
new file mode 100644
index 0000000..25b12be
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+object ReduceFunction {
+
+ def apply[T](fn: (T, T) => T): ReduceFunction[T] = {
+ new ReduceFunction[T] {
+ override def apply(t1: T, t2: T): T = {
+ fn(t1, t2)
+ }
+ }
+ }
+}
+
+/**
+ * Combines two inputs into one output of the same type.
+ *
+ * @param T Type of both inputs and output
+ */
+abstract class ReduceFunction[T] extends SerializableFunction {
+
+ def apply(t1: T, t2: T): T
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index f2654ea..7f3c250 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gearpump.streaming.dsl.javaapi
-import scala.collection.JavaConverters._
import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
import org.apache.gearpump.streaming.dsl.window.api.Window
-import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
-import org.apache.gearpump.streaming.javaapi.dsl.functions._
import org.apache.gearpump.streaming.task.Task
/**
@@ -31,23 +31,23 @@ import org.apache.gearpump.streaming.task.Task
class JavaStream[T](val stream: Stream[T]) {
/** FlatMap on stream */
- def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
- new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+ def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
}
/** Map on stream */
def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
- new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
}
/** Only keep the messages that FilterFunction returns true. */
def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
- new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+ new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
}
/** Does aggregation on the stream */
def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
- new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description))
+ new JavaStream[T](stream.reduce(fn, description))
}
def log(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index 82a284e..b8d1f4c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -19,13 +19,14 @@
package org.apache.gearpump.streaming.dsl.javaapi
import java.util.Collection
-import scala.collection.JavaConverters._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp}
import org.apache.gearpump.streaming.source.DataSource
+import scala.collection.JavaConverters._
+
class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
private val streamApp = StreamApp(name, context, userConfig)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..85d597d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Java version of FlatMapFunction returns a java.util.Iterator.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): java.util.Iterator[R]
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
new file mode 100644
index 0000000..7656cba
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
+
+/**
+ * Assigns the input value into a group.
+ *
+ * @param T Input value type
+ * @param GROUP Group value type
+ */
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index f15d875..82ea7c7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction}
import org.apache.gearpump.streaming.{Constants, Processor}
import org.apache.gearpump.streaming.dsl.task.TransformTask
import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
@@ -134,7 +134,7 @@ case class ChainableOp[IN, OUT](
other match {
case op: ChainableOp[OUT, _] =>
// TODO: preserve type info
- ChainableOp(fn.andThen(op.fn))
+ ChainableOp(AndThen(fn, op.fn))
case _ =>
throw new OpChainException(this, other)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 5322648..687fd2e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -17,23 +17,35 @@
*/
package org.apache.gearpump.streaming.dsl.plan.functions
-trait SingleInputFunction[IN, OUT] extends Serializable {
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+
+/**
+ * Internal function to process single input
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
def process(value: IN): TraversableOnce[OUT]
- def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- AndThen(this, other)
- }
+
def finish(): TraversableOnce[OUT] = None
- def clearState(): Unit = {}
+
+ def teardown(): Unit = {}
+
def description: String
}
-case class AndThen[IN, MIDDLE, OUT](
- first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE],
+ second: SingleInputFunction[MIDDLE, OUT])
extends SingleInputFunction[IN, OUT] {
- override def andThen[OUTER](
- other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- first.andThen(second.andThen(other))
+ override def setup(): Unit = {
+ first.setup()
+ second.setup()
}
override def process(value: IN): TraversableOnce[OUT] = {
@@ -49,9 +61,9 @@ case class AndThen[IN, MIDDLE, OUT](
}
}
- override def clearState(): Unit = {
- first.clearState()
- second.clearState()
+ override def teardown(): Unit = {
+ first.teardown()
+ second.teardown()
}
override def description: String = {
@@ -61,22 +73,31 @@ case class AndThen[IN, MIDDLE, OUT](
}
}
-class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
extends SingleInputFunction[IN, OUT] {
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
override def process(value: IN): TraversableOnce[OUT] = {
fn(value)
}
- override def description: String = descriptionMessage
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
}
-
-class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+class Reducer[T](fn: ReduceFunction[T], val description: String)
extends SingleInputFunction[T, T] {
private var state: Option[T] = None
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
override def process(value: T): TraversableOnce[T] = {
if (state.isEmpty) {
state = Option(value)
@@ -90,23 +111,18 @@ class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
state
}
- override def clearState(): Unit = {
+ override def teardown(): Unit = {
state = None
+ fn.teardown()
}
-
- override def description: String = descriptionMessage
}
-class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
override def process(value: T): TraversableOnce[Unit] = {
emit(value)
None
}
- override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
- throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
- }
-
override def description: String = ""
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
new file mode 100644
index 0000000..430d795
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions._
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow}
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+ private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+ private val edge: Option[OpEdge] = None) {
+
+ /**
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
+ *
+ * @param fn flatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
+ *
+ * @param fn flatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+ transform(new FlatMapper[T, R](fn, description))
+ }
+
+ /**
+ * Returns a new stream by applying a map function to each element.
+ *
+ * @param fn map function
+ * @return A new stream with type [R]
+ */
+ def map[R](fn: T => R, description: String = "map"): Stream[R] = {
+ this.map(MapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a map function to each element.
+ *
+ * @param fn map function
+ * @return A new stream with type [R]
+ */
+ def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new Stream keeping the elements that satisfy the filter function.
+ *
+ * @param fn filter function
+ * @return a new stream after filter
+ */
+ def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
+ this.filter(FilterFunction(fn), description)
+ }
+
+ /**
+ * Returns a new Stream keeping the elements that satisfy the filter function.
+ *
+ * @param fn filter function
+ * @return a new stream after filter
+ */
+ def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
+ * @param description description message for this operator
+ * @return a new stream after reduce
+ */
+ def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+ reduce(ReduceFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
+ * @param description description message for this operator
+ * @return a new stream after reduce
+ */
+ def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+ transform(new Reducer[T](fn, description))
+ }
+
+ private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = {
+ val op = ChainableOp(fn)
+ graph.addVertex(op)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+ new Stream(graph, op)
+ }
+
+ /**
+ * Log to task log file
+ */
+ def log(): Unit = {
+ this.map(msg => {
+ LoggerFactory.getLogger("dsl").info(msg.toString)
+ msg
+ }, "log")
+ }
+
+ /**
+ * Merges data from two stream into one
+ *
+ * @param other the other stream
+ * @return the merged stream
+ */
+ def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+ val mergeOp = MergeOp(description, UserConfig.empty)
+ graph.addVertex(mergeOp)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+ graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+ new Stream[T](graph, mergeOp)
+ }
+
+ /**
+ * Group by function (T => Group)
+ *
+ * For example, we have T type, People(name: String, gender: String, age: Int)
+ * groupBy[People](_.gender) will group the people by gender.
+ *
+ * You can append other combinators after groupBy
+ *
+ * For example,
+ * {{{
+ * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
+ * }}}
+ *
+ * @param fn Group by function
+ * @param parallelism Parallelism level
+ * @param description The description
+ * @return the grouped stream
+ */
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ window(CountWindow.apply(1).accumulating)
+ .groupBy[GROUP](fn, parallelism, description)
+ }
+
+ /**
+ * Window function
+ *
+ * @param win window definition
+ * @param description window description
+ * @return [[WindowStream]] where groupBy could be applied
+ */
+ def window(win: Window, description: String = "window"): WindowStream[T] = {
+ new WindowStream[T](graph, edge, thisNode, win, description)
+ }
+
+ /**
+ * Connects with a low level Processor(TaskDescription)
+ *
+ * @param processor a user defined processor
+ * @param parallelism parallelism level
+ * @return new stream after processing with type [R]
+ */
+ def process[R](
+ processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
+ description: String = "process"): Stream[R] = {
+ val processorOp = ProcessorOp(processor, parallelism, conf, description)
+ graph.addVertex(processorOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+ new Stream[R](graph, processorOp, Some(Shuffle))
+ }
+}
+
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
+ window: Window, winDesc: String) {
+
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+ description: String = "groupBy"): Stream[T] = {
+ val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
+ val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+ s"$winDesc.$description")
+ graph.addVertex(groupOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+ new Stream[T](graph, groupOp)
+ }
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+ /**
+ * GroupBy key
+ *
+ * Applies to Stream[Tuple2[K,V]]
+ *
+ * @param parallelism the parallelism for this operation
+ * @return the new KV stream
+ */
+ def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+ stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+ }
+
+ /**
+ * Sum the value of the tuples
+ *
+ * Apply to Stream[Tuple2[K,V]], V must be of type Number
+ *
+ * For input (key, value1), (key, value2), will generate (key, value1 + value2)
+ * @param numeric the numeric operations
+ * @return the sum stream
+ */
+ def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+ stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
+ }
+}
+
+object Stream {
+
+ def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
+ new Stream[T](graph, node, edge)
+ }
+
+ def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+ def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
+ = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+ implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
+ new KVStream(stream)
+ }
+
+ implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+ def sink(dataSink: DataSink, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+ implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
+ stream.graph.addVertex(sink)
+ stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+ new Stream[T](stream.graph, sink)
+ }
+ }
+}
+
+class LoggerSink[T] extends DataSink {
+ var logger: Logger = _
+
+ override def open(context: TaskContext): Unit = {
+ this.logger = context.logger
+ }
+
+ override def write(message: Message): Unit = {
+ logger.info("logging message " + message.msg)
+ }
+
+ override def close(): Unit = Unit
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
new file mode 100644
index 0000000..d6eed2e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.Graph
+
+import scala.language.implicitConversions
+
+/**
+ * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
+ *
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
+ *
+ * @param name name of app
+ */
+class StreamApp(
+ name: String, system: ActorSystem, userConfig: UserConfig,
+ private val graph: Graph[Op, OpEdge]) {
+
+ def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
+ this(name, system, userConfig, Graph.empty[Op, OpEdge])
+ }
+
+ def plan(): StreamApplication = {
+ implicit val actorSystem = system
+ val planner = new Planner
+ val dag = planner.plan(graph)
+ StreamApplication(name, dag, userConfig)
+ }
+}
+
+object StreamApp {
+ def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty)
+ : StreamApp = {
+ new StreamApp(name, context.system, userConfig)
+ }
+
+ implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
+ streamApp.plan()
+ }
+
+ implicit class Source(app: StreamApp) extends java.io.Serializable {
+
+ def source[T](dataSource: DataSource, parallelism: Int = 1,
+ conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
+ implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
+ app.graph.addVertex(sourceOp)
+ new Stream[T](app.graph, sourceOp)
+ }
+
+ def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+ this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
+ }
+ }
+}
+
+/** A test message source which generated message sequence repeatedly. */
+class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
+ private lazy val iterator: Iterator[T] = seq.iterator
+
+ override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+ override def read(): Message = {
+ if (iterator.hasNext) {
+ Message(iterator.next(), Instant.now().toEpochMilli)
+ } else {
+ null
+ }
+ }
+
+ override def close(): Unit = {}
+
+ override def getWatermark: Instant = Instant.now()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..f10a3db
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
+
+import scala.collection.JavaConverters._
+
+object FlatMapFunction {
+
+ def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[R] = {
+ fn.apply(t).asScala
+ }
+
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+
+ def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+ override def apply(t: T): TraversableOnce[R] = {
+ fn(t)
+ }
+ }
+ }
+
+ def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[R] = {
+ Option(fn(t))
+ }
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+
+ def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = {
+ new FlatMapFunction[T, T] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[T] = {
+ if (fn(t)) {
+ Option(t)
+ } else {
+ None
+ }
+ }
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+}
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Scala version of FlatMapFunction returns a TraversableOnce.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): TraversableOnce[R]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
new file mode 100644
index 0000000..ab88bf1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
+ def teardown(): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index e35f085..c13a4fb 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -23,9 +23,8 @@ import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
import org.apache.gearpump.streaming.task.{Task, TaskContext}
-class TransformTask[IN, OUT](
- operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
- userConf: UserConfig) extends Task(taskContext, userConf) {
+class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
+ taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
def this(taskContext: TaskContext, userConf: UserConfig) = {
this(userConf.getValue[SingleInputFunction[IN, OUT]](
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index d87a9e4..223a4af 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -28,7 +28,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
import com.gs.collections.impl.set.mutable.UnifiedSet
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction}
import org.apache.gearpump.streaming.dsl.window.api.Discarding
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.LogUtil
@@ -39,7 +39,6 @@ trait WindowRunner {
def process(message: Message): Unit
def trigger(time: Instant): Unit
-
}
object DefaultWindowRunner {
@@ -59,7 +58,6 @@ class DefaultWindowRunner[IN, GROUP, OUT](
private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
-
override def process(message: Message): Unit = {
val (group, buckets) = groupBy.groupBy(message)
buckets.foreach { bucket =>
@@ -72,8 +70,11 @@ class DefaultWindowRunner[IN, GROUP, OUT](
inputs.add(message.msg.asInstanceOf[IN])
windowGroups.put(wg, inputs)
}
- groupFns.putIfAbsent(group,
- userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+ if (!groupFns.containsKey(group)) {
+ val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
+ fn.setup()
+ groupFns.put(group, fn)
+ }
}
override def trigger(time: Instant): Unit = {
@@ -88,8 +89,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
wgs.forEach(new Procedure[WindowGroup[GROUP]] {
override def value(each: WindowGroup[GROUP]): Unit = {
val inputs = windowGroups.remove(each)
- val reduceFn = groupFns.get(each.group)
- .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+ val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time)))
inputs.forEach(new Procedure[IN] {
override def value(t: IN): Unit = {
// .toList forces eager evaluation
@@ -99,7 +99,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
// .toList forces eager evaluation
reduceFn.finish().toList
if (groupBy.window.accumulationMode == Discarding) {
- reduceFn.clearState()
+ reduceFn.teardown()
}
}
})
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
deleted file mode 100644
index db4db93..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.util.Graph
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
-
- implicit var system: ActorSystem = _
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "be able to generate multiple new streams" in {
- val context: ClientContext = mock[ClientContext]
- when(context.system).thenReturn(system)
-
- val dsl = StreamApp("dsl", context)
- dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
- dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
-
- val application = dsl.plan()
- application shouldBe a [StreamApplication]
- application.name shouldBe "dsl"
- val dag = application.userConfig
- .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
- dag.vertices.size shouldBe 2
- dag.vertices.foreach { processor =>
- processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
- if (processor.description == "A") {
- processor.parallelism shouldBe 2
- } else if (processor.description == "B") {
- processor.parallelism shouldBe 3
- } else {
- fail(s"undefined source ${processor.description}")
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
deleted file mode 100644
index 8def61e..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor._
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.dsl.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.Graph
-import org.apache.gearpump.util.Graph._
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Either, Left, Right}
-
-class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
-
- implicit var system: ActorSystem = _
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "translate the DSL to a DAG" in {
- val context: ClientContext = mock[ClientContext]
- when(context.system).thenReturn(system)
-
- val dsl = StreamApp("dsl", context)
-
- val data =
- """
- five four three two one
- five four three two
- five four three
- five four
- five
- """
- val stream = dsl.source(data.lines.toList, 1, "").
- flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
- map(word => (word, 1)).
- groupBy(_._1, parallelism = 2).
- reduce((left, right) => (left._1, left._2 + right._2)).
- map[Either[(String, Int), String]](Left(_))
-
- val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
- stream.merge(query).process[(String, Int)](classOf[Join], 1)
-
- val app: StreamApplication = dsl.plan()
- val dag = app.userConfig
- .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
-
- val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
- edge.partitionerFactory.partitioner.getClass.getName
- }
- val expectedDagTopology = getExpectedDagTopology
-
- dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet
- dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet
- }
-
- private def getExpectedDagTopology: Graph[String, String] = {
- val source = classOf[DataSourceTask[_, _]].getName
- val group = classOf[CountTriggerTask[_, _]].getName
- val merge = classOf[TransformTask[_, _]].getName
- val join = classOf[Join].getName
-
- val hash = classOf[HashPartitioner].getName
- val groupBy = classOf[GroupByPartitioner[_, _]].getName
- val colocation = classOf[CoLocationPartitioner].getName
-
- val expectedDagTopology = Graph(
- source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
- source ~ hash ~> merge
- )
- expectedDagTopology
- }
-}
-
-object StreamSpec {
-
- class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
- var query: String = _
-
- override def onNext(msg: Message): Unit = {
- msg.msg match {
- case Left(wordCount: (String @unchecked, Int @unchecked)) =>
- if (query != null && wordCount._1 == query) {
- taskContext.output(new Message(wordCount))
- }
-
- case Right(query: String) =>
- this.query = query
- }
- }
- }
-}