You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:13 UTC

[01/12] beam git commit: [BEAM-1180] Implement GearpumpPipelineResult

Repository: beam
Updated Branches:
  refs/heads/gearpump-runner 4c445dd0b -> 1ed16f11a


[BEAM-1180] Implement GearpumpPipelineResult


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21554764
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21554764
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21554764

Branch: refs/heads/gearpump-runner
Commit: 21554764056c45ea18be1e844b4ca1bfb71e544a
Parents: 4c445dd
Author: manuzhang <ow...@gmail.com>
Authored: Tue Dec 20 10:39:56 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Jan 4 12:59:08 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  1 +
 .../gearpump/GearpumpPipelineResult.java        | 59 ++++++++++++++++++--
 .../beam/runners/gearpump/GearpumpRunner.java   |  4 +-
 .../runners/gearpump/TestGearpumpRunner.java    |  4 ++
 .../translators/GroupByKeyTranslator.java       |  1 -
 5 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index bb35ad7..777ad34 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -69,6 +69,7 @@
                   <dependenciesToScan>
                     <dependency>org.apache.beam:beam-sdks-java-core</dependency>
                   </dependenciesToScan>
+                  <argLine>-noverify</argLine>
                   <excludes>
                     <!-- side input is not supported in Gearpump -->
                     <exclude>

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index ed1201d..9c8f7b3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.gearpump;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -26,31 +27,62 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 
+import org.apache.gearpump.cluster.MasterToAppMaster;
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
+import org.apache.gearpump.cluster.client.ClientContext;
 import org.joda.time.Duration;
 
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
 
 /**
  * Result of executing a {@link Pipeline} with Gearpump.
  */
 public class GearpumpPipelineResult implements PipelineResult {
+
+  private final ClientContext client;
+  private final int appId;
+  private final Duration defaultWaitDuration = Duration.standardSeconds(60);
+  private final Duration defaultWaitInterval = Duration.standardSeconds(10);
+
+  public GearpumpPipelineResult(ClientContext client, int appId) {
+    this.client = client;
+    this.appId = appId;
+  }
+
   @Override
   public State getState() {
-    return null;
+    return getGearpumpState();
   }
 
   @Override
   public State cancel() throws IOException {
-    return null;
+    client.shutdown(appId);
+    return State.CANCELLED;
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    return null;
+    long start = System.currentTimeMillis();
+    do {
+      try {
+        Thread.sleep(defaultWaitInterval.getMillis());
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    } while (State.RUNNING == getGearpumpState()
+        && (System.currentTimeMillis() - start) < duration.getMillis());
+
+    if (State.RUNNING == getGearpumpState()) {
+      return State.DONE;
+    } else {
+      return State.FAILED;
+    }
   }
 
   @Override
   public State waitUntilFinish() {
-    return null;
+    return waitUntilFinish(defaultWaitDuration);
   }
 
   @Override
@@ -66,4 +98,23 @@ public class GearpumpPipelineResult implements PipelineResult {
     return null;
   }
 
+  private State getGearpumpState() {
+    String status = null;
+    List<AppMasterData> apps =
+        JavaConverters.<AppMasterData>seqAsJavaListConverter(
+            (Seq<AppMasterData>) client.listApps().appMasters()).asJava();
+    for (AppMasterData app: apps) {
+      if (app.appId() == appId) {
+        status = app.status();
+      }
+    }
+    if (null == status || status.equals(MasterToAppMaster.AppMasterNonExist())) {
+      return State.UNKNOWN;
+    } else if (status.equals(MasterToAppMaster.AppMasterActive())) {
+      return State.RUNNING;
+    } else {
+      return State.STOPPED;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 4083922..9c44da3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -107,9 +107,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
     TranslationContext translationContext = new TranslationContext(streamApp, options);
     GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
     translator.translate(pipeline);
-    streamApp.submit();
+    int appId = streamApp.submit();
 
-    return null;
+    return new GearpumpPipelineResult(clientContext, appId);
   }
 
   private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index 89d31a6..ee31fb5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.gearpump;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -52,7 +53,10 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
   @Override
   public GearpumpPipelineResult run(Pipeline pipeline) {
     GearpumpPipelineResult result = delegate.run(pipeline);
+    PipelineResult.State state = result.waitUntilFinish();
     cluster.stop();
+    assert(state == PipelineResult.State.DONE);
+
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index d64f1bf..989957f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -134,7 +134,6 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   private static class ValueToIterable<K, V>
       implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
 
-
     @Override
     public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
       Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());


[11/12] beam git commit: note thread is interrupted on InterruptedException

Posted by ke...@apache.org.
note thread is interrupted on InterruptedException


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d814857a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d814857a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d814857a

Branch: refs/heads/gearpump-runner
Commit: d814857a6c372ba3f87106d49d3ce1ef7c3c7766
Parents: 85dcfbd
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jan 20 13:21:24 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Jan 20 13:21:24 2017 +0800

----------------------------------------------------------------------
 .../apache/beam/runners/gearpump/GearpumpPipelineResult.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d814857a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 3dd78de..9e53517 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -67,7 +67,13 @@ public class GearpumpPipelineResult implements PipelineResult {
     do {
       try {
         Thread.sleep(defaultWaitInterval.getMillis());
-      } catch (InterruptedException e) {
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        if (e instanceof RuntimeException) {
+          throw (RuntimeException) e;
+        }
         throw new RuntimeException(e);
       }
     } while (State.RUNNING == getGearpumpState()


[10/12] beam git commit: Remove cache for Gearpump on travis

Posted by ke...@apache.org.
Remove cache for Gearpump on travis


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85dcfbd1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85dcfbd1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85dcfbd1

Branch: refs/heads/gearpump-runner
Commit: 85dcfbd153acb4e450a4f0f94fc54b19b76507d3
Parents: 7613ec4
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jan 20 08:33:04 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Jan 20 10:52:11 2017 +0800

----------------------------------------------------------------------
 .travis.yml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/85dcfbd1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a806477..7dcd5d1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -59,6 +59,7 @@ before_install:
 install:
   # Removing this here protects from inadvertent caching
   - rm -rf "$HOME/.m2/repository/org/apache/beam"
+  - rm -rf "$HOME/.m2/repository/org/apache/gearpump"
 
 script:
   - travis_retry mvn --batch-mode --update-snapshots --no-snapshot-updates $MAVEN_OVERRIDE install && travis_retry bash -ex .travis/test_wordcount.sh


[12/12] beam git commit: This closes #1661: Implement GearpumpPipelineResult

Posted by ke...@apache.org.
This closes #1661: Implement GearpumpPipelineResult

  note thread is interrupted on InterruptedException
  Remove cache for Gearpump on travis
  reduce timeout to wait for result
  fix ParDo.BoundMulti translation
  return encoded key for GroupByKey translation
  support OutputTimeFn
  update to latest gearpump dsl function interface
  fix group by window
  activate ROS on Gearpump by default
  update ROS configurations
  [BEAM-1180] Implement GearpumpPipelineResult


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ed16f11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ed16f11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ed16f11

Branch: refs/heads/gearpump-runner
Commit: 1ed16f11a3fb24c3cc6773235651c4a9255d6fbc
Parents: 4c445dd d814857
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jan 23 19:22:31 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jan 23 19:22:31 2017 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  1 +
 runners/gearpump/pom.xml                        |  9 +-
 .../gearpump/GearpumpPipelineResult.java        | 65 ++++++++++++-
 .../beam/runners/gearpump/GearpumpRunner.java   |  7 +-
 .../runners/gearpump/TestGearpumpRunner.java    |  4 +
 .../translators/GroupByKeyTranslator.java       | 96 ++++++++++++++++----
 .../translators/ParDoBoundMultiTranslator.java  | 35 +++++--
 .../translators/TranslationContext.java         |  1 -
 .../translators/WindowBoundTranslator.java      | 49 ++++++++--
 .../translators/functions/DoFnFunction.java     | 21 ++++-
 .../gearpump/translators/io/GearpumpSource.java |  4 +-
 11 files changed, 238 insertions(+), 54 deletions(-)
----------------------------------------------------------------------



[02/12] beam git commit: update ROS configurations

Posted by ke...@apache.org.
update ROS configurations


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cfdc971f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cfdc971f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cfdc971f

Branch: refs/heads/gearpump-runner
Commit: cfdc971f45ff716b7bd88b3e054ca7077454ab07
Parents: 2155476
Author: manuzhang <ow...@gmail.com>
Authored: Thu Jan 5 13:47:42 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu Jan 5 13:47:42 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cfdc971f/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 777ad34..4e3722c 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -64,6 +64,12 @@
                 </goals>
                 <configuration>
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <excludedGroups>
+                    org.apache.beam.sdk.testing.UsesStatefulParDo,
+                    org.apache.beam.sdk.testing.UsesTimersInParDo,
+                    org.apache.beam.sdk.testing.UsesSplittableParDo,
+                    org.apache.beam.sdk.testing.UsesMetrics
+                  </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
                   <dependenciesToScan>


[06/12] beam git commit: support OutputTimeFn

Posted by ke...@apache.org.
support OutputTimeFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6aaf0d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6aaf0d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6aaf0d9

Branch: refs/heads/gearpump-runner
Commit: f6aaf0d9ecd6b67ad6f7eed413af3fae3b3bdf6f
Parents: 3bf8263
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 21:41:40 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:41:40 2017 +0800

----------------------------------------------------------------------
 .../translators/GroupByKeyTranslator.java       | 57 +++++++++++++++++---
 .../translators/WindowBoundTranslator.java      | 20 ++++---
 2 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 4eaf755..e16a178 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -58,12 +59,16 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     JavaStream<WindowedValue<KV<K, V>>> inputStream =
         context.getInputStream(input);
     int parallelism = context.getPipelineOptions().getParallelism();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+        input.getWindowingStrategy().getOutputTimeFn();
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
         .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
         .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
-        .reduce(new MergeValue<K, V>(), "merge_value");
+        .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
+        .reduce(new Merge<K, V>(outputTimeFn), "merge")
+        .map(new Values<K, V>(), "values");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
@@ -141,15 +146,53 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class MergeValue<K, V> extends
-      ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
+  private static class KeyedByTimestamp<K, V>
+      extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
+      KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
 
     @Override
-    public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
-        WindowedValue<KV<K, Iterable<V>>> wv2) {
-      return WindowedValue.of(KV.of(wv1.getValue().getKey(),
+    public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+        WindowedValue<KV<K, Iterable<V>>> wv) {
+      return KV.of(wv.getTimestamp(), wv);
+    }
+  }
+
+  private static class Merge<K, V> extends
+      ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+
+    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+    Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
+    }
+
+    @Override
+    public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+        KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1,
+        KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) {
+      org.joda.time.Instant t1 = kv1.getKey();
+      org.joda.time.Instant t2 = kv2.getKey();
+
+      WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
+      WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+
+      return KV.of(outputTimeFn.combine(t1, t2),
+          WindowedValue.of(KV.of(wv1.getValue().getKey(),
               Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
-          wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
+              wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()));
+    }
+  }
+
+  private static class Values<K, V> extends
+      MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>,
+          WindowedValue<KV<K, Iterable<V>>>> {
+
+    @Override
+    public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant,
+        WindowedValue<KV<K, Iterable<V>>>> kv) {
+      org.joda.time.Instant timestamp = kv.getKey();
+      WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue();
+      return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index d3c50a5..9bf1936 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -53,9 +54,11 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
         transform.getOutputStrategyInternal(input.getWindowingStrategy());
     WindowFn<T, BoundedWindow> windowFn =
         (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+        outputStrategy.getOutputTimeFn();
     JavaStream<WindowedValue<T>> outputStream =
         inputStream
-            .flatMap(new AssignWindows(windowFn), "assign_windows")
+            .flatMap(new AssignWindows(windowFn, outputTimeFn), "assign_windows")
             .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
@@ -64,17 +67,21 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
   private static class AssignWindows<T> extends
       FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
-    private final WindowFn<T, BoundedWindow> fn;
+    private final WindowFn<T, BoundedWindow> windowFn;
+    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
 
-    AssignWindows(WindowFn<T, BoundedWindow> fn) {
-      this.fn = fn;
+    AssignWindows(
+        WindowFn<T, BoundedWindow> windowFn,
+        OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+      this.windowFn = windowFn;
+      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
     public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) {
       List<WindowedValue<T>>  ret = new LinkedList<>();
       try {
-        Collection<BoundedWindow> windows = fn.assignWindows(fn.new AssignContext() {
+        Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
           @Override
           public T element() {
             return value.getValue();
@@ -91,8 +98,9 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
           }
         });
         for (BoundedWindow window: windows) {
+          Instant timestamp = outputTimeFn.assignOutputTime(value.getTimestamp(), window);
           ret.add(WindowedValue.of(
-              value.getValue(), value.getTimestamp(), window, value.getPane()));
+              value.getValue(), timestamp, window, value.getPane()));
         }
       } catch (Exception e) {
         throw new RuntimeException(e);


[03/12] beam git commit: activate ROS on Gearpump by default

Posted by ke...@apache.org.
activate ROS on Gearpump by default


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ea633d2c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ea633d2c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ea633d2c

Branch: refs/heads/gearpump-runner
Commit: ea633d2c02a621df09c5f4a6d0ab3824271c7db2
Parents: cfdc971
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 7 10:47:03 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 7 10:47:03 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ea633d2c/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 4e3722c..7c6fa76 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -49,7 +49,7 @@
   <profiles>
     <profile>
       <id>local-runnable-on-service-tests</id>
-      <activation><activeByDefault>false</activeByDefault></activation>
+      <activation><activeByDefault>true</activeByDefault></activation>
       <build>
         <plugins>
           <plugin>


[07/12] beam git commit: return encoded key for GroupByKey translation

Posted by ke...@apache.org.
return encoded key for GroupByKey translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/364a3f08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/364a3f08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/364a3f08

Branch: refs/heads/gearpump-runner
Commit: 364a3f089747ff4761cb5b54c963c8a8013574a0
Parents: f6aaf0d
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 11:16:05 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 11:16:05 2017 +0800

----------------------------------------------------------------------
 .../translators/GroupByKeyTranslator.java       | 24 ++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/364a3f08/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index e16a178..ac8e218 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -22,17 +22,22 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -56,6 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext context) {
     PCollection<KV<K, V>> input = context.getInput(transform);
+    Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
     JavaStream<WindowedValue<KV<K, V>>> inputStream =
         context.getInputStream(input);
     int parallelism = context.getPipelineOptions().getParallelism();
@@ -64,7 +70,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
         .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
-        .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
+        .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
         .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
         .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
         .reduce(new Merge<K, V>(outputTimeFn), "merge")
@@ -128,11 +134,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   }
 
   private static class GroupByFn<K, V> extends
-      GroupByFunction<WindowedValue<KV<K, V>>, K> {
+      GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
+
+    private final Coder<K> keyCoder;
+
+    GroupByFn(Coder<K> keyCoder) {
+      this.keyCoder = keyCoder;
+    }
 
     @Override
-    public K apply(WindowedValue<KV<K, V>> wv) {
-      return wv.getValue().getKey();
+    public ByteBuffer apply(WindowedValue<KV<K, V>> wv) {
+      try {
+        return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey()));
+      } catch (CoderException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 


[08/12] beam git commit: fix ParDo.BoundMulti translation

Posted by ke...@apache.org.
fix ParDo.BoundMulti translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2d326ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2d326ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2d326ff

Branch: refs/heads/gearpump-runner
Commit: b2d326ff73afca5c8e941c8006e9d74261a6b9df
Parents: 364a3f0
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 12:31:26 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 12:31:26 2017 +0800

----------------------------------------------------------------------
 .../gearpump/translators/ParDoBoundMultiTranslator.java        | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b2d326ff/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 0d5b8bc..bf7073b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -91,8 +91,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
     private DoFnRunner<InputT, OutputT> doFnRunner;
     private final DoFn<InputT, OutputT> doFn;
-    private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
-        .newArrayList();
+    private List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs;
 
     public DoFnMultiFunction(
         GearpumpPipelineOptions pipelineOptions,
@@ -127,6 +126,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
 
     @Override
     public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
+      outputs = Lists.newArrayList();
+
       if (null == doFnRunner) {
         doFnRunner = doFnRunnerFactory.createRunner();
       }
@@ -166,6 +167,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
 
     @Override
     public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
+      // System.out.println(wv.getValue().getKey() + ":" + wv.getValue().getValue());
       return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
           wv.getWindows(), wv.getPane());
     }


[05/12] beam git commit: update to latest gearpump dsl function interface

Posted by ke...@apache.org.
update to latest gearpump dsl function interface


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf82638
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf82638
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf82638

Branch: refs/heads/gearpump-runner
Commit: 3bf82638096ae7aa91c7d3c862c2994772bee51b
Parents: e63d42d
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 13:36:07 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:40:18 2017 +0800

----------------------------------------------------------------------
 .../translators/GroupByKeyTranslator.java       | 12 ++++----
 .../translators/ParDoBoundMultiTranslator.java  | 29 ++++++++++++++------
 .../translators/WindowBoundTranslator.java      |  4 +--
 .../translators/functions/DoFnFunction.java     | 21 +++++++++++---
 4 files changed, 46 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 8e3ffe3..4eaf755 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -36,15 +36,15 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
 import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
 import org.apache.gearpump.streaming.dsl.window.api.Window;
 import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
 import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
 import scala.collection.JavaConversions;
 
 
@@ -122,7 +122,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class GroupByFn<K, V> implements
+  private static class GroupByFn<K, V> extends
       GroupByFunction<WindowedValue<KV<K, V>>, K> {
 
     @Override
@@ -132,7 +132,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   }
 
   private static class ValueToIterable<K, V>
-      implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
+      extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
 
     @Override
     public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
@@ -141,7 +141,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class MergeValue<K, V> implements
+  private static class MergeValue<K, V> extends
       ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 24f9734..0d5b8bc 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -41,10 +42,10 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
+import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 
 /**
  * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
@@ -83,12 +84,13 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
   /**
    * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
    */
-  private static class DoFnMultiFunction<InputT, OutputT> implements
-      FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
-      DoFnRunners.OutputManager {
+  private static class DoFnMultiFunction<InputT, OutputT>
+    extends FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>
+    implements DoFnRunners.OutputManager {
 
     private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
     private DoFnRunner<InputT, OutputT> doFnRunner;
+    private final DoFn<InputT, OutputT> doFn;
     private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
         .newArrayList();
 
@@ -99,6 +101,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
         TupleTagList sideOutputTags,
         WindowingStrategy<?, ?> windowingStrategy,
         SideInputReader sideInputReader) {
+      this.doFn = doFn;
       this.doFnRunnerFactory = new DoFnRunnerFactory<>(
           pipelineOptions,
           doFn,
@@ -113,6 +116,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     }
 
     @Override
+    public void setup() {
+      DoFnInvokers.invokerFor(doFn).invokeSetup();
+    }
+
+    @Override
+    public void teardown() {
+      DoFnInvokers.invokerFor(doFn).invokeTeardown();
+    }
+
+    @Override
     public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
       if (null == doFnRunner) {
         doFnRunner = doFnRunnerFactory.createRunner();
@@ -133,7 +146,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     }
   }
 
-  private static class FilterByOutputTag<OutputT> implements
+  private static class FilterByOutputTag<OutputT> extends
       FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
 
     private final TupleTag<OutputT> tupleTag;
@@ -148,7 +161,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     }
   }
 
-  private static class ExtractOutput<OutputT> implements
+  private static class ExtractOutput<OutputT> extends
       MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 32dd5de..d3c50a5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -34,8 +34,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
 import org.apache.gearpump.streaming.task.TaskContext;
 import org.joda.time.Instant;
 
@@ -61,7 +61,7 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
-  private static class AssignWindows<T> implements
+  private static class AssignWindows<T> extends
       FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
     private final WindowFn<T, BoundedWindow> fn;

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 42969fe..a66d3a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -30,30 +30,33 @@ import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 
 /**
  * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
  */
-public class DoFnFunction<InputT, OutputT> implements
-    FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
+public class DoFnFunction<InputT, OutputT> extends
+    FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>> implements
+    DoFnRunners.OutputManager {
 
   private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
   private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
   private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
   private DoFnRunner<InputT, OutputT> doFnRunner;
+  private final DoFn<InputT, OutputT> doFn;
 
   public DoFnFunction(
       GearpumpPipelineOptions pipelineOptions,
       DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       SideInputReader sideInputReader) {
+    this.doFn = doFn;
     this.doFnRunnerFactory = new DoFnRunnerFactory<>(
         pipelineOptions,
         doFn,
@@ -68,6 +71,16 @@ public class DoFnFunction<InputT, OutputT> implements
   }
 
   @Override
+  public void setup() {
+    DoFnInvokers.invokerFor(doFn).invokeSetup();
+  }
+
+  @Override
+  public void teardown() {
+    DoFnInvokers.invokerFor(doFn).invokeTeardown();
+  }
+
+  @Override
   public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
     outputs = Lists.newArrayList();
 


[04/12] beam git commit: fix group by window

Posted by ke...@apache.org.
fix group by window


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e63d42d1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e63d42d1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e63d42d1

Branch: refs/heads/gearpump-runner
Commit: e63d42d1113728badc66285e7ce7a8ce204a82d9
Parents: ea633d2
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 7 23:07:23 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 13:35:31 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/gearpump/GearpumpRunner.java   |  3 ++-
 .../translators/GroupByKeyTranslator.java       |  4 +--
 .../translators/TranslationContext.java         |  1 -
 .../translators/WindowBoundTranslator.java      | 27 ++++++++++++++++++--
 .../gearpump/translators/io/GearpumpSource.java |  4 +--
 5 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 9c44da3..01fdb3b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -102,8 +102,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
         options.getSerializers());
     ClientContext clientContext = getClientContext(options, config);
     options.setClientContext(clientContext);
+    UserConfig userConfig = UserConfig.empty();
     JavaStreamApp streamApp = new JavaStreamApp(
-        appName, clientContext, UserConfig.empty());
+        appName, clientContext, userConfig);
     TranslationContext translationContext = new TranslationContext(streamApp, options);
     GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
     translator.translate(pipeline);

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 989957f..8e3ffe3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.KV;
 
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.window.api.Accumulating$;
+import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
 import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
 import org.apache.gearpump.streaming.dsl.window.api.Window;
 import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
@@ -60,7 +60,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     int parallelism = context.getPipelineOptions().getParallelism();
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
         .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
-            EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window")
+            EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
         .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
         .reduce(new MergeValue<K, V>(), "merge_value");

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 63fb619..b2cff8a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -50,7 +50,6 @@ public class TranslationContext {
   public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) {
     this.streamApp = streamApp;
     this.pipelineOptions = pipelineOptions;
-
   }
 
   public void setCurrentTransform(TransformHierarchy.Node treeNode) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 11f30fc..32dd5de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -31,8 +31,12 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.Task;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.task.TaskContext;
 import org.joda.time.Instant;
 
 /**
@@ -50,11 +54,13 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
     WindowFn<T, BoundedWindow> windowFn =
         (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
     JavaStream<WindowedValue<T>> outputStream =
-        inputStream.flatMap(new AssignWindows(windowFn), "assign_windows");
+        inputStream
+            .flatMap(new AssignWindows(windowFn), "assign_windows")
+            .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp");
+
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
-
   private static class AssignWindows<T> implements
       FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
@@ -94,4 +100,21 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
       return ret.iterator();
     }
   }
+
+  /**
+   * Assign WindowedValue timestamp to Gearpump message.
+   * @param <T> element type of WindowedValue
+   */
+  public static class AssignTimestampTask<T> extends Task {
+
+    public AssignTimestampTask(TaskContext taskContext, UserConfig userConfig) {
+      super(taskContext, userConfig);
+    }
+
+    @Override
+    public void onNext(Message message) {
+      final WindowedValue<T> value = (WindowedValue<T>) message.msg();
+      context.output(Message.apply(value, value.getTimestamp().getMillis()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index b266590..6e5b2de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,8 +28,6 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 
 import org.apache.gearpump.Message;
@@ -79,7 +77,7 @@ public abstract class GearpumpSource<T> implements DataSource {
         org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
         available = reader.advance();
         message = Message.apply(
-            WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+            WindowedValue.valueInGlobalWindow(data),
             timestamp.getMillis());
       }
     } catch (Exception e) {


[09/12] beam git commit: reduce timeout to wait for result

Posted by ke...@apache.org.
reduce timeout to wait for result


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7613ec44
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7613ec44
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7613ec44

Branch: refs/heads/gearpump-runner
Commit: 7613ec44cedf12d1e7bf80e8bb6a505f09653c4f
Parents: b2d326f
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 13:25:12 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 13:25:12 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/gearpump/GearpumpPipelineResult.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7613ec44/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 9c8f7b3..3dd78de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -42,8 +42,8 @@ public class GearpumpPipelineResult implements PipelineResult {
 
   private final ClientContext client;
   private final int appId;
-  private final Duration defaultWaitDuration = Duration.standardSeconds(60);
-  private final Duration defaultWaitInterval = Duration.standardSeconds(10);
+  private final Duration defaultWaitDuration = Duration.standardSeconds(30);
+  private final Duration defaultWaitInterval = Duration.standardSeconds(5);
 
   public GearpumpPipelineResult(ClientContext client, int appId) {
     this.client = client;