You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:13 UTC
[01/12] beam git commit: [BEAM-1180] Implement GearpumpPipelineResult
Repository: beam
Updated Branches:
refs/heads/gearpump-runner 4c445dd0b -> 1ed16f11a
[BEAM-1180] Implement GearpumpPipelineResult
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21554764
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21554764
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21554764
Branch: refs/heads/gearpump-runner
Commit: 21554764056c45ea18be1e844b4ca1bfb71e544a
Parents: 4c445dd
Author: manuzhang <ow...@gmail.com>
Authored: Tue Dec 20 10:39:56 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Jan 4 12:59:08 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 1 +
.../gearpump/GearpumpPipelineResult.java | 59 ++++++++++++++++++--
.../beam/runners/gearpump/GearpumpRunner.java | 4 +-
.../runners/gearpump/TestGearpumpRunner.java | 4 ++
.../translators/GroupByKeyTranslator.java | 1 -
5 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index bb35ad7..777ad34 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -69,6 +69,7 @@
<dependenciesToScan>
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
</dependenciesToScan>
+ <argLine>-noverify</argLine>
<excludes>
<!-- side input is not supported in Gearpump -->
<exclude>
http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index ed1201d..9c8f7b3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.gearpump;
import java.io.IOException;
+import java.util.List;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
@@ -26,31 +27,62 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.gearpump.cluster.MasterToAppMaster;
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
+import org.apache.gearpump.cluster.client.ClientContext;
import org.joda.time.Duration;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
/**
* Result of executing a {@link Pipeline} with Gearpump.
*/
public class GearpumpPipelineResult implements PipelineResult {
+
+ private final ClientContext client;
+ private final int appId;
+ private final Duration defaultWaitDuration = Duration.standardSeconds(60);
+ private final Duration defaultWaitInterval = Duration.standardSeconds(10);
+
+ public GearpumpPipelineResult(ClientContext client, int appId) {
+ this.client = client;
+ this.appId = appId;
+ }
+
@Override
public State getState() {
- return null;
+ return getGearpumpState();
}
@Override
public State cancel() throws IOException {
- return null;
+ client.shutdown(appId);
+ return State.CANCELLED;
}
@Override
public State waitUntilFinish(Duration duration) {
- return null;
+ long start = System.currentTimeMillis();
+ do {
+ try {
+ Thread.sleep(defaultWaitInterval.getMillis());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } while (State.RUNNING == getGearpumpState()
+ && (System.currentTimeMillis() - start) < duration.getMillis());
+
+ if (State.RUNNING == getGearpumpState()) {
+ return State.DONE;
+ } else {
+ return State.FAILED;
+ }
}
@Override
public State waitUntilFinish() {
- return null;
+ return waitUntilFinish(defaultWaitDuration);
}
@Override
@@ -66,4 +98,23 @@ public class GearpumpPipelineResult implements PipelineResult {
return null;
}
+ private State getGearpumpState() {
+ String status = null;
+ List<AppMasterData> apps =
+ JavaConverters.<AppMasterData>seqAsJavaListConverter(
+ (Seq<AppMasterData>) client.listApps().appMasters()).asJava();
+ for (AppMasterData app: apps) {
+ if (app.appId() == appId) {
+ status = app.status();
+ }
+ }
+ if (null == status || status.equals(MasterToAppMaster.AppMasterNonExist())) {
+ return State.UNKNOWN;
+ } else if (status.equals(MasterToAppMaster.AppMasterActive())) {
+ return State.RUNNING;
+ } else {
+ return State.STOPPED;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 4083922..9c44da3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -107,9 +107,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
TranslationContext translationContext = new TranslationContext(streamApp, options);
GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
translator.translate(pipeline);
- streamApp.submit();
+ int appId = streamApp.submit();
- return null;
+ return new GearpumpPipelineResult(clientContext, appId);
}
private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index 89d31a6..ee31fb5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -19,6 +19,7 @@
package org.apache.beam.runners.gearpump;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -52,7 +53,10 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
@Override
public GearpumpPipelineResult run(Pipeline pipeline) {
GearpumpPipelineResult result = delegate.run(pipeline);
+ PipelineResult.State state = result.waitUntilFinish();
cluster.stop();
+ assert(state == PipelineResult.State.DONE);
+
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index d64f1bf..989957f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -134,7 +134,6 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
private static class ValueToIterable<K, V>
implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
-
@Override
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());
[11/12] beam git commit: note thread is interrupted on
InterruptedException
Posted by ke...@apache.org.
note thread is interrupted on InterruptedException
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d814857a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d814857a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d814857a
Branch: refs/heads/gearpump-runner
Commit: d814857a6c372ba3f87106d49d3ce1ef7c3c7766
Parents: 85dcfbd
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jan 20 13:21:24 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Jan 20 13:21:24 2017 +0800
----------------------------------------------------------------------
.../apache/beam/runners/gearpump/GearpumpPipelineResult.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d814857a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 3dd78de..9e53517 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -67,7 +67,13 @@ public class GearpumpPipelineResult implements PipelineResult {
do {
try {
Thread.sleep(defaultWaitInterval.getMillis());
- } catch (InterruptedException e) {
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
throw new RuntimeException(e);
}
} while (State.RUNNING == getGearpumpState()
[10/12] beam git commit: Remove cache for Gearpump on travis
Posted by ke...@apache.org.
Remove cache for Gearpump on travis
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85dcfbd1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85dcfbd1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85dcfbd1
Branch: refs/heads/gearpump-runner
Commit: 85dcfbd153acb4e450a4f0f94fc54b19b76507d3
Parents: 7613ec4
Author: manuzhang <ow...@gmail.com>
Authored: Fri Jan 20 08:33:04 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Jan 20 10:52:11 2017 +0800
----------------------------------------------------------------------
.travis.yml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/85dcfbd1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a806477..7dcd5d1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -59,6 +59,7 @@ before_install:
install:
# Removing this here protects from inadvertent caching
- rm -rf "$HOME/.m2/repository/org/apache/beam"
+ - rm -rf "$HOME/.m2/repository/org/apache/gearpump"
script:
- travis_retry mvn --batch-mode --update-snapshots --no-snapshot-updates $MAVEN_OVERRIDE install && travis_retry bash -ex .travis/test_wordcount.sh
[12/12] beam git commit: This closes #1661: Implement
GearpumpPipelineResult
Posted by ke...@apache.org.
This closes #1661: Implement GearpumpPipelineResult
note thread is interrupted on InterruptedException
Remove cache for Gearpump on travis
reduce timeout to wait for result
fix ParDo.BoundMulti translation
return encoded key for GroupByKey translation
support OutputTimeFn
update to latest gearpump dsl function interface
fix group by window
activate ROS on Gearpump by default
update ROS configurations
[BEAM-1180] Implement GearpumpPipelineResult
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ed16f11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ed16f11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ed16f11
Branch: refs/heads/gearpump-runner
Commit: 1ed16f11a3fb24c3cc6773235651c4a9255d6fbc
Parents: 4c445dd d814857
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jan 23 19:22:31 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jan 23 19:22:31 2017 -0800
----------------------------------------------------------------------
.travis.yml | 1 +
runners/gearpump/pom.xml | 9 +-
.../gearpump/GearpumpPipelineResult.java | 65 ++++++++++++-
.../beam/runners/gearpump/GearpumpRunner.java | 7 +-
.../runners/gearpump/TestGearpumpRunner.java | 4 +
.../translators/GroupByKeyTranslator.java | 96 ++++++++++++++++----
.../translators/ParDoBoundMultiTranslator.java | 35 +++++--
.../translators/TranslationContext.java | 1 -
.../translators/WindowBoundTranslator.java | 49 ++++++++--
.../translators/functions/DoFnFunction.java | 21 ++++-
.../gearpump/translators/io/GearpumpSource.java | 4 +-
11 files changed, 238 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
[02/12] beam git commit: update ROS configurations
Posted by ke...@apache.org.
update ROS configurations
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cfdc971f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cfdc971f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cfdc971f
Branch: refs/heads/gearpump-runner
Commit: cfdc971f45ff716b7bd88b3e054ca7077454ab07
Parents: 2155476
Author: manuzhang <ow...@gmail.com>
Authored: Thu Jan 5 13:47:42 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Thu Jan 5 13:47:42 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cfdc971f/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 777ad34..4e3722c 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -64,6 +64,12 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <excludedGroups>
+ org.apache.beam.sdk.testing.UsesStatefulParDo,
+ org.apache.beam.sdk.testing.UsesTimersInParDo,
+ org.apache.beam.sdk.testing.UsesSplittableParDo,
+ org.apache.beam.sdk.testing.UsesMetrics
+ </excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
[06/12] beam git commit: support OutputTimeFn
Posted by ke...@apache.org.
support OutputTimeFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6aaf0d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6aaf0d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6aaf0d9
Branch: refs/heads/gearpump-runner
Commit: f6aaf0d9ecd6b67ad6f7eed413af3fae3b3bdf6f
Parents: 3bf8263
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 21:41:40 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:41:40 2017 +0800
----------------------------------------------------------------------
.../translators/GroupByKeyTranslator.java | 57 +++++++++++++++++---
.../translators/WindowBoundTranslator.java | 20 ++++---
2 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 4eaf755..e16a178 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -58,12 +59,16 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+ input.getWindowingStrategy().getOutputTimeFn();
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
- .reduce(new MergeValue<K, V>(), "merge_value");
+ .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
+ .reduce(new Merge<K, V>(outputTimeFn), "merge")
+ .map(new Values<K, V>(), "values");
context.setOutputStream(context.getOutput(transform), outputStream);
}
@@ -141,15 +146,53 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class MergeValue<K, V> extends
- ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
+ private static class KeyedByTimestamp<K, V>
+ extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
+ KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
@Override
- public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
- WindowedValue<KV<K, Iterable<V>>> wv2) {
- return WindowedValue.of(KV.of(wv1.getValue().getKey(),
+ public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+ WindowedValue<KV<K, Iterable<V>>> wv) {
+ return KV.of(wv.getTimestamp(), wv);
+ }
+ }
+
+ private static class Merge<K, V> extends
+ ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+
+ private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+ Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+ this.outputTimeFn = outputTimeFn;
+ }
+
+ @Override
+ public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+ KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1,
+ KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) {
+ org.joda.time.Instant t1 = kv1.getKey();
+ org.joda.time.Instant t2 = kv2.getKey();
+
+ WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
+ WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+
+ return KV.of(outputTimeFn.combine(t1, t2),
+ WindowedValue.of(KV.of(wv1.getValue().getKey(),
Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
- wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
+ wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()));
+ }
+ }
+
+ private static class Values<K, V> extends
+ MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>,
+ WindowedValue<KV<K, Iterable<V>>>> {
+
+ @Override
+ public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant,
+ WindowedValue<KV<K, Iterable<V>>>> kv) {
+ org.joda.time.Instant timestamp = kv.getKey();
+ WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue();
+ return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index d3c50a5..9bf1936 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -53,9 +54,11 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
transform.getOutputStrategyInternal(input.getWindowingStrategy());
WindowFn<T, BoundedWindow> windowFn =
(WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+ outputStrategy.getOutputTimeFn();
JavaStream<WindowedValue<T>> outputStream =
inputStream
- .flatMap(new AssignWindows(windowFn), "assign_windows")
+ .flatMap(new AssignWindows(windowFn, outputTimeFn), "assign_windows")
.process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp");
context.setOutputStream(context.getOutput(transform), outputStream);
@@ -64,17 +67,21 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
private static class AssignWindows<T> extends
FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
- private final WindowFn<T, BoundedWindow> fn;
+ private final WindowFn<T, BoundedWindow> windowFn;
+ private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
- AssignWindows(WindowFn<T, BoundedWindow> fn) {
- this.fn = fn;
+ AssignWindows(
+ WindowFn<T, BoundedWindow> windowFn,
+ OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+ this.windowFn = windowFn;
+ this.outputTimeFn = outputTimeFn;
}
@Override
public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) {
List<WindowedValue<T>> ret = new LinkedList<>();
try {
- Collection<BoundedWindow> windows = fn.assignWindows(fn.new AssignContext() {
+ Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
@Override
public T element() {
return value.getValue();
@@ -91,8 +98,9 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
}
});
for (BoundedWindow window: windows) {
+ Instant timestamp = outputTimeFn.assignOutputTime(value.getTimestamp(), window);
ret.add(WindowedValue.of(
- value.getValue(), value.getTimestamp(), window, value.getPane()));
+ value.getValue(), timestamp, window, value.getPane()));
}
} catch (Exception e) {
throw new RuntimeException(e);
[03/12] beam git commit: activate ROS on Gearpump by default
Posted by ke...@apache.org.
activate ROS on Gearpump by default
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ea633d2c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ea633d2c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ea633d2c
Branch: refs/heads/gearpump-runner
Commit: ea633d2c02a621df09c5f4a6d0ab3824271c7db2
Parents: cfdc971
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 7 10:47:03 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 7 10:47:03 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ea633d2c/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 4e3722c..7c6fa76 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -49,7 +49,7 @@
<profiles>
<profile>
<id>local-runnable-on-service-tests</id>
- <activation><activeByDefault>false</activeByDefault></activation>
+ <activation><activeByDefault>true</activeByDefault></activation>
<build>
<plugins>
<plugin>
[07/12] beam git commit: return encoded key for GroupByKey translation
Posted by ke...@apache.org.
return encoded key for GroupByKey translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/364a3f08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/364a3f08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/364a3f08
Branch: refs/heads/gearpump-runner
Commit: 364a3f089747ff4761cb5b54c963c8a8013574a0
Parents: f6aaf0d
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 11:16:05 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 11:16:05 2017 +0800
----------------------------------------------------------------------
.../translators/GroupByKeyTranslator.java | 24 ++++++++++++++++----
1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/364a3f08/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index e16a178..ac8e218 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -22,17 +22,22 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -56,6 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = context.getInput(transform);
+ Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
@@ -64,7 +70,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
- .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
+ .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
.map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
.reduce(new Merge<K, V>(outputTimeFn), "merge")
@@ -128,11 +134,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
private static class GroupByFn<K, V> extends
- GroupByFunction<WindowedValue<KV<K, V>>, K> {
+ GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
+
+ private final Coder<K> keyCoder;
+
+ GroupByFn(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
@Override
- public K apply(WindowedValue<KV<K, V>> wv) {
- return wv.getValue().getKey();
+ public ByteBuffer apply(WindowedValue<KV<K, V>> wv) {
+ try {
+ return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey()));
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
}
}
[08/12] beam git commit: fix ParDo.BoundMulti translation
Posted by ke...@apache.org.
fix ParDo.BoundMulti translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2d326ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2d326ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2d326ff
Branch: refs/heads/gearpump-runner
Commit: b2d326ff73afca5c8e941c8006e9d74261a6b9df
Parents: 364a3f0
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 12:31:26 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 12:31:26 2017 +0800
----------------------------------------------------------------------
.../gearpump/translators/ParDoBoundMultiTranslator.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b2d326ff/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 0d5b8bc..bf7073b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -91,8 +91,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
private DoFnRunner<InputT, OutputT> doFnRunner;
private final DoFn<InputT, OutputT> doFn;
- private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
- .newArrayList();
+ private List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs;
public DoFnMultiFunction(
GearpumpPipelineOptions pipelineOptions,
@@ -127,6 +126,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
@Override
public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
+ outputs = Lists.newArrayList();
+
if (null == doFnRunner) {
doFnRunner = doFnRunnerFactory.createRunner();
}
@@ -166,6 +167,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
@Override
public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
+ // System.out.println(wv.getValue().getKey() + ":" + wv.getValue().getValue());
return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
wv.getWindows(), wv.getPane());
}
[05/12] beam git commit: update to latest gearpump dsl function
interface
Posted by ke...@apache.org.
update to latest gearpump dsl function interface
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf82638
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf82638
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf82638
Branch: refs/heads/gearpump-runner
Commit: 3bf82638096ae7aa91c7d3c862c2994772bee51b
Parents: e63d42d
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 13:36:07 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:40:18 2017 +0800
----------------------------------------------------------------------
.../translators/GroupByKeyTranslator.java | 12 ++++----
.../translators/ParDoBoundMultiTranslator.java | 29 ++++++++++++++------
.../translators/WindowBoundTranslator.java | 4 +--
.../translators/functions/DoFnFunction.java | 21 +++++++++++---
4 files changed, 46 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 8e3ffe3..4eaf755 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -36,15 +36,15 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import org.apache.gearpump.streaming.dsl.window.api.Window;
import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
import scala.collection.JavaConversions;
@@ -122,7 +122,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class GroupByFn<K, V> implements
+ private static class GroupByFn<K, V> extends
GroupByFunction<WindowedValue<KV<K, V>>, K> {
@Override
@@ -132,7 +132,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
private static class ValueToIterable<K, V>
- implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
+ extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
@Override
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
@@ -141,7 +141,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class MergeValue<K, V> implements
+ private static class MergeValue<K, V> extends
ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 24f9734..0d5b8bc 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -41,10 +42,10 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
/**
* {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
@@ -83,12 +84,13 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
/**
* Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
*/
- private static class DoFnMultiFunction<InputT, OutputT> implements
- FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
- DoFnRunners.OutputManager {
+ private static class DoFnMultiFunction<InputT, OutputT>
+ extends FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>
+ implements DoFnRunners.OutputManager {
private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
private DoFnRunner<InputT, OutputT> doFnRunner;
+ private final DoFn<InputT, OutputT> doFn;
private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
.newArrayList();
@@ -99,6 +101,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
TupleTagList sideOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
SideInputReader sideInputReader) {
+ this.doFn = doFn;
this.doFnRunnerFactory = new DoFnRunnerFactory<>(
pipelineOptions,
doFn,
@@ -113,6 +116,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
}
@Override
+ public void setup() {
+ DoFnInvokers.invokerFor(doFn).invokeSetup();
+ }
+
+ @Override
+ public void teardown() {
+ DoFnInvokers.invokerFor(doFn).invokeTeardown();
+ }
+
+ @Override
public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
if (null == doFnRunner) {
doFnRunner = doFnRunnerFactory.createRunner();
@@ -133,7 +146,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
}
}
- private static class FilterByOutputTag<OutputT> implements
+ private static class FilterByOutputTag<OutputT> extends
FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
private final TupleTag<OutputT> tupleTag;
@@ -148,7 +161,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
}
}
- private static class ExtractOutput<OutputT> implements
+ private static class ExtractOutput<OutputT> extends
MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 32dd5de..d3c50a5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -34,8 +34,8 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
import org.apache.gearpump.streaming.task.TaskContext;
import org.joda.time.Instant;
@@ -61,7 +61,7 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
context.setOutputStream(context.getOutput(transform), outputStream);
}
- private static class AssignWindows<T> implements
+ private static class AssignWindows<T> extends
FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
private final WindowFn<T, BoundedWindow> fn;
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 42969fe..a66d3a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -30,30 +30,33 @@ import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
/**
* Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
*/
-public class DoFnFunction<InputT, OutputT> implements
- FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
+public class DoFnFunction<InputT, OutputT> extends
+ FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>> implements
+ DoFnRunners.OutputManager {
private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
private DoFnRunner<InputT, OutputT> doFnRunner;
+ private final DoFn<InputT, OutputT> doFn;
public DoFnFunction(
GearpumpPipelineOptions pipelineOptions,
DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
SideInputReader sideInputReader) {
+ this.doFn = doFn;
this.doFnRunnerFactory = new DoFnRunnerFactory<>(
pipelineOptions,
doFn,
@@ -68,6 +71,16 @@ public class DoFnFunction<InputT, OutputT> implements
}
@Override
+ public void setup() {
+ DoFnInvokers.invokerFor(doFn).invokeSetup();
+ }
+
+ @Override
+ public void teardown() {
+ DoFnInvokers.invokerFor(doFn).invokeTeardown();
+ }
+
+ @Override
public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
outputs = Lists.newArrayList();
[04/12] beam git commit: fix group by window
Posted by ke...@apache.org.
fix group by window
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e63d42d1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e63d42d1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e63d42d1
Branch: refs/heads/gearpump-runner
Commit: e63d42d1113728badc66285e7ce7a8ce204a82d9
Parents: ea633d2
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 7 23:07:23 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 13:35:31 2017 +0800
----------------------------------------------------------------------
.../beam/runners/gearpump/GearpumpRunner.java | 3 ++-
.../translators/GroupByKeyTranslator.java | 4 +--
.../translators/TranslationContext.java | 1 -
.../translators/WindowBoundTranslator.java | 27 ++++++++++++++++++--
.../gearpump/translators/io/GearpumpSource.java | 4 +--
5 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 9c44da3..01fdb3b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -102,8 +102,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
options.getSerializers());
ClientContext clientContext = getClientContext(options, config);
options.setClientContext(clientContext);
+ UserConfig userConfig = UserConfig.empty();
JavaStreamApp streamApp = new JavaStreamApp(
- appName, clientContext, UserConfig.empty());
+ appName, clientContext, userConfig);
TranslationContext translationContext = new TranslationContext(streamApp, options);
GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
translator.translate(pipeline);
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 989957f..8e3ffe3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.window.api.Accumulating$;
+import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import org.apache.gearpump.streaming.dsl.window.api.Window;
import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
@@ -60,7 +60,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
int parallelism = context.getPipelineOptions().getParallelism();
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
- EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window")
+ EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
.reduce(new MergeValue<K, V>(), "merge_value");
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 63fb619..b2cff8a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -50,7 +50,6 @@ public class TranslationContext {
public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) {
this.streamApp = streamApp;
this.pipelineOptions = pipelineOptions;
-
}
public void setCurrentTransform(TransformHierarchy.Node treeNode) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 11f30fc..32dd5de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -31,8 +31,12 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.Task;
import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.task.TaskContext;
import org.joda.time.Instant;
/**
@@ -50,11 +54,13 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
WindowFn<T, BoundedWindow> windowFn =
(WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
JavaStream<WindowedValue<T>> outputStream =
- inputStream.flatMap(new AssignWindows(windowFn), "assign_windows");
+ inputStream
+ .flatMap(new AssignWindows(windowFn), "assign_windows")
+ .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp");
+
context.setOutputStream(context.getOutput(transform), outputStream);
}
-
private static class AssignWindows<T> implements
FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
@@ -94,4 +100,21 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
return ret.iterator();
}
}
+
+ /**
+ * Assign WindowedValue timestamp to Gearpump message.
+ * @param <T> element type of WindowedValue
+ */
+ public static class AssignTimestampTask<T> extends Task {
+
+ public AssignTimestampTask(TaskContext taskContext, UserConfig userConfig) {
+ super(taskContext, userConfig);
+ }
+
+ @Override
+ public void onNext(Message message) {
+ final WindowedValue<T> value = (WindowedValue<T>) message.msg();
+ context.output(Message.apply(value, value.getTimestamp().getMillis()));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index b266590..6e5b2de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,8 +28,6 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.gearpump.Message;
@@ -79,7 +77,7 @@ public abstract class GearpumpSource<T> implements DataSource {
org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
available = reader.advance();
message = Message.apply(
- WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+ WindowedValue.valueInGlobalWindow(data),
timestamp.getMillis());
}
} catch (Exception e) {
[09/12] beam git commit: reduce timeout to wait for result
Posted by ke...@apache.org.
reduce timeout to wait for result
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7613ec44
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7613ec44
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7613ec44
Branch: refs/heads/gearpump-runner
Commit: 7613ec44cedf12d1e7bf80e8bb6a505f09653c4f
Parents: b2d326f
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 16 13:25:12 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jan 16 13:25:12 2017 +0800
----------------------------------------------------------------------
.../org/apache/beam/runners/gearpump/GearpumpPipelineResult.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7613ec44/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 9c8f7b3..3dd78de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -42,8 +42,8 @@ public class GearpumpPipelineResult implements PipelineResult {
private final ClientContext client;
private final int appId;
- private final Duration defaultWaitDuration = Duration.standardSeconds(60);
- private final Duration defaultWaitInterval = Duration.standardSeconds(10);
+ private final Duration defaultWaitDuration = Duration.standardSeconds(30);
+ private final Duration defaultWaitInterval = Duration.standardSeconds(5);
public GearpumpPipelineResult(ClientContext client, int appId) {
this.client = client;