You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/16 21:18:44 UTC
[2/6] incubator-beam git commit: Add Window.Bound translator
Add Window.Bound translator
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/85d54ab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/85d54ab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/85d54ab2
Branch: refs/heads/gearpump-runner
Commit: 85d54ab20f21297da25059ed7b4c8ed02e93bb74
Parents: 46d3563
Author: manuzhang <ow...@gmail.com>
Authored: Fri Dec 16 16:49:06 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Dec 16 16:49:06 2016 +0800
----------------------------------------------------------------------
.../gearpump/GearpumpPipelineTranslator.java | 3 +
.../translators/WindowBoundTranslator.java | 97 ++++++++++++++++++++
2 files changed, 100 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85d54ab2/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 84dfeec..20624ed 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
import org.apache.beam.runners.gearpump.translators.TransformTranslator;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.runners.gearpump.translators.WindowBoundTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PValue;
import org.apache.gearpump.util.Graph;
@@ -71,6 +73,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
registerTransformTranslator(Flatten.FlattenPCollectionList.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
+ registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85d54ab2/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
new file mode 100644
index 0000000..11f30fc
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.joda.time.Instant;
+
+/**
+ * {@link Window.Bound} is translated to Gearpump flatMap function.
+ */
+@SuppressWarnings("unchecked")
+public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
+
+ @Override
+ public void translate(Window.Bound<T> transform, TranslationContext context) {
+ PCollection<T> input = context.getInput(transform);
+ JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input);
+ WindowingStrategy<?, ?> outputStrategy =
+ transform.getOutputStrategyInternal(input.getWindowingStrategy());
+ WindowFn<T, BoundedWindow> windowFn =
+ (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+ JavaStream<WindowedValue<T>> outputStream =
+ inputStream.flatMap(new AssignWindows(windowFn), "assign_windows");
+ context.setOutputStream(context.getOutput(transform), outputStream);
+ }
+
+
+ private static class AssignWindows<T> implements
+ FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+ private final WindowFn<T, BoundedWindow> fn;
+
+ AssignWindows(WindowFn<T, BoundedWindow> fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) {
+ List<WindowedValue<T>> ret = new LinkedList<>();
+ try {
+ Collection<BoundedWindow> windows = fn.assignWindows(fn.new AssignContext() {
+ @Override
+ public T element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+ });
+ for (BoundedWindow window: windows) {
+ ret.add(WindowedValue.of(
+ value.getValue(), value.getTimestamp(), window, value.getPane()));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return ret.iterator();
+ }
+ }
+}