You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/12/08 03:45:21 UTC

incubator-gearpump git commit: [GEARPUMP-245] Invoke GroupByFunction.apply in JavaStream DSL

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 5cf79bfd3 -> 791f45a0c


[GEARPUMP-245] Invoke GroupByFunction.apply in JavaStream DSL

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [x] Make sure tests pass via `sbt clean test`.
 - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.

Author: manuzhang <ow...@gmail.com>

Closes #119 from manuzhang/GEARPUMP-246.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/791f45a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/791f45a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/791f45a0

Branch: refs/heads/master
Commit: 791f45a0cd789bf2b2190167f701800fc4315be0
Parents: 5cf79bf
Author: manuzhang <ow...@gmail.com>
Authored: Thu Dec 8 11:45:06 2016 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Thu Dec 8 11:45:06 2016 +0800

----------------------------------------------------------------------
 .../examples/wordcountjava/dsl/WordCount.java   | 82 +++++++++++---------
 .../streaming/dsl/javaapi/JavaStream.scala      |  2 +-
 .../streaming/dsl/task/CountTriggerTask.scala   |  1 -
 .../dsl/task/EventTimeTriggerTask.scala         |  1 -
 .../dsl/task/ProcessingTimeTriggerTask.scala    |  1 -
 5 files changed, 45 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 0ecc42e..a453d8c 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -19,21 +19,18 @@
 package org.apache.gearpump.streaming.examples.wordcountjava.dsl;
 
 import com.typesafe.config.Config;
+import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.task.TaskContext;
 import scala.Tuple2;
 
-import java.util.ArrayList;
+import java.time.Instant;
 import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
 
 /** Java version of WordCount with high level DSL API */
 public class WordCount {
@@ -45,41 +42,50 @@ public class WordCount {
   public static void main(Config akkaConf, String[] args) throws InterruptedException {
     ClientContext context = new ClientContext(akkaConf);
     JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty());
-    List<String> source = new ArrayList<>(Arrays.asList("This is a good start, bingo!! bingo!!"));
-
-    JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source");
-
-    JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterator<String> apply(String s) {
-        return new ArrayList<String>(Arrays.asList(s.split("\\s+"))).iterator();
-      }
-    }, "flatMap");
-
-    JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
-      @Override
-      public Tuple2<String, Integer> apply(String s) {
-        return new Tuple2<String, Integer>(s, 1);
-      }
-    }, "map");
-
-    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() {
-      @Override
-      public String apply(Tuple2<String, Integer> tuple) {
-        return tuple._1();
-      }
-    }, 1, "groupBy");
-
-    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-      @Override
-      public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
-        return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2());
-      }
-    }, "reduce");
+
+    JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
+        1, UserConfig.empty(), "source");
+
+    JavaStream<String> words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(),
+        "flatMap");
+
+    JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 1), "map");
+
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy");
+
+    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(
+        (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce");
 
     wordcount.log();
 
     app.run();
     context.close();
   }
+
+  private static class StringSource implements DataSource {
+
+    private final String str;
+
+    StringSource(String str) {
+      this.str = str;
+    }
+
+    @Override
+    public void open(TaskContext context, Instant startTime) {
+    }
+
+    @Override
+    public Message read() {
+      return Message.apply(str, Instant.now().toEpochMilli());
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 3003b98..f68731e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -65,7 +65,7 @@ class JavaStream[T](val stream: Stream[T]) {
    */
   def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
       parallelism: Int, description: String): JavaStream[T] = {
-    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+    new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
   }
 
   def window(win: Window, description: String): JavaWindowStream[T] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
index 4ee2fa8..06f2964 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
index 4b7649f..0674339 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
index 980a54b..78ba762 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -21,7 +21,6 @@ import java.time.Instant
 import java.util.concurrent.TimeUnit
 
 import akka.actor.Actor.Receive
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._