You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:16 UTC
[04/12] beam git commit: fix group by window
fix group by window
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e63d42d1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e63d42d1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e63d42d1
Branch: refs/heads/gearpump-runner
Commit: e63d42d1113728badc66285e7ce7a8ce204a82d9
Parents: ea633d2
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 7 23:07:23 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 13:35:31 2017 +0800
----------------------------------------------------------------------
.../beam/runners/gearpump/GearpumpRunner.java | 3 ++-
.../translators/GroupByKeyTranslator.java | 4 +--
.../translators/TranslationContext.java | 1 -
.../translators/WindowBoundTranslator.java | 27 ++++++++++++++++++--
.../gearpump/translators/io/GearpumpSource.java | 4 +--
5 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 9c44da3..01fdb3b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -102,8 +102,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
options.getSerializers());
ClientContext clientContext = getClientContext(options, config);
options.setClientContext(clientContext);
+ UserConfig userConfig = UserConfig.empty();
JavaStreamApp streamApp = new JavaStreamApp(
- appName, clientContext, UserConfig.empty());
+ appName, clientContext, userConfig);
TranslationContext translationContext = new TranslationContext(streamApp, options);
GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
translator.translate(pipeline);
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 989957f..8e3ffe3 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.window.api.Accumulating$;
+import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import org.apache.gearpump.streaming.dsl.window.api.Window;
import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
@@ -60,7 +60,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
int parallelism = context.getPipelineOptions().getParallelism();
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
- EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window")
+ EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
.reduce(new MergeValue<K, V>(), "merge_value");
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 63fb619..b2cff8a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -50,7 +50,6 @@ public class TranslationContext {
public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) {
this.streamApp = streamApp;
this.pipelineOptions = pipelineOptions;
-
}
public void setCurrentTransform(TransformHierarchy.Node treeNode) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 11f30fc..32dd5de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -31,8 +31,12 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.Task;
import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.task.TaskContext;
import org.joda.time.Instant;
/**
@@ -50,11 +54,13 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
WindowFn<T, BoundedWindow> windowFn =
(WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
JavaStream<WindowedValue<T>> outputStream =
- inputStream.flatMap(new AssignWindows(windowFn), "assign_windows");
+ inputStream
+ .flatMap(new AssignWindows(windowFn), "assign_windows")
+ .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp");
+
context.setOutputStream(context.getOutput(transform), outputStream);
}
-
private static class AssignWindows<T> implements
FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
@@ -94,4 +100,21 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
return ret.iterator();
}
}
+
+ /**
+ * Assign WindowedValue timestamp to Gearpump message.
+ * @param <T> element type of WindowedValue
+ */
+ public static class AssignTimestampTask<T> extends Task {
+
+ public AssignTimestampTask(TaskContext taskContext, UserConfig userConfig) {
+ super(taskContext, userConfig);
+ }
+
+ @Override
+ public void onNext(Message message) {
+ final WindowedValue<T> value = (WindowedValue<T>) message.msg();
+ context.output(Message.apply(value, value.getTimestamp().getMillis()));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index b266590..6e5b2de 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,8 +28,6 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.gearpump.Message;
@@ -79,7 +77,7 @@ public abstract class GearpumpSource<T> implements DataSource {
org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
available = reader.advance();
message = Message.apply(
- WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+ WindowedValue.valueInGlobalWindow(data),
timestamp.getMillis());
}
} catch (Exception e) {