You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/12/08 03:45:21 UTC
incubator-gearpump git commit: [GEARPUMP-245] Invoke
GroupByFunction.apply in JavaStream DSL
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 5cf79bfd3 -> 791f45a0c
[GEARPUMP-245] Invoke GroupByFunction.apply in JavaStream DSL
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [x] Make sure the commit message is formatted like:
`[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
- [x] Make sure tests pass via `sbt clean test`.
- [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.
Author: manuzhang <ow...@gmail.com>
Closes #119 from manuzhang/GEARPUMP-246.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/791f45a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/791f45a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/791f45a0
Branch: refs/heads/master
Commit: 791f45a0cd789bf2b2190167f701800fc4315be0
Parents: 5cf79bf
Author: manuzhang <ow...@gmail.com>
Authored: Thu Dec 8 11:45:06 2016 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Thu Dec 8 11:45:06 2016 +0800
----------------------------------------------------------------------
.../examples/wordcountjava/dsl/WordCount.java | 82 +++++++++++---------
.../streaming/dsl/javaapi/JavaStream.scala | 2 +-
.../streaming/dsl/task/CountTriggerTask.scala | 1 -
.../dsl/task/EventTimeTriggerTask.scala | 1 -
.../dsl/task/ProcessingTimeTriggerTask.scala | 1 -
5 files changed, 45 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 0ecc42e..a453d8c 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -19,21 +19,18 @@
package org.apache.gearpump.streaming.examples.wordcountjava.dsl;
import com.typesafe.config.Config;
+import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.ClusterConfig;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.cluster.client.ClientContext;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.task.TaskContext;
import scala.Tuple2;
-import java.util.ArrayList;
+import java.time.Instant;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
/** Java version of WordCount with high level DSL API */
public class WordCount {
@@ -45,41 +42,50 @@ public class WordCount {
public static void main(Config akkaConf, String[] args) throws InterruptedException {
ClientContext context = new ClientContext(akkaConf);
JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
- List<String> source = new ArrayList<>(Arrays.asList("This is a good start, bingo!! bingo!!"));
-
- JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
-
- JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> apply(String s) {
- return new ArrayList<String>(Arrays.asList(s.split("\\s+"))).iterator();
- }
- }, "flatMap");
-
- JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> apply(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }, "map");
-
- JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
- @Override
- public String apply(Tuple2<String, Integer> tuple) {
- return tuple._1();
- }
- }, 1, "groupBy");
-
- JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
- return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
- }
- }, "reduce");
+
+ JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
+ 1, UserConfig.empty(), "source");
+
+ JavaStream<String> words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(),
+ "flatMap");
+
+ JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 1), "map");
+
+ JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy");
+
+ JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(
+ (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce");
wordcount.log();
app.run();
context.close();
}
+
+ private static class StringSource implements DataSource {
+
+ private final String str;
+
+ StringSource(String str) {
+ this.str = str;
+ }
+
+ @Override
+ public void open(TaskContext context, Instant startTime) {
+ }
+
+ @Override
+ public Message read() {
+ return Message.apply(str, Instant.now().toEpochMilli());
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 3003b98..f68731e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -65,7 +65,7 @@ class JavaStream[T](val stream: Stream[T]) {
*/
def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
parallelism: Int, description: String): JavaStream[T] = {
- new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+ new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
}
def window(win: Window, description: String): JavaWindowStream[T] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
index 4ee2fa8..06f2964 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
import java.time.Instant
-import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
index 4b7649f..0674339 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
import java.time.Instant
-import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
index 980a54b..78ba762 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -21,7 +21,6 @@ import java.time.Instant
import java.util.concurrent.TimeUnit
import akka.actor.Actor.Receive
-import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._