You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:17 UTC
[05/12] beam git commit: update to latest gearpump dsl function
interface
update to latest gearpump dsl function interface
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf82638
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf82638
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf82638
Branch: refs/heads/gearpump-runner
Commit: 3bf82638096ae7aa91c7d3c862c2994772bee51b
Parents: e63d42d
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 13:36:07 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:40:18 2017 +0800
----------------------------------------------------------------------
.../translators/GroupByKeyTranslator.java | 12 ++++----
.../translators/ParDoBoundMultiTranslator.java | 29 ++++++++++++++------
.../translators/WindowBoundTranslator.java | 4 +--
.../translators/functions/DoFnFunction.java | 21 +++++++++++---
4 files changed, 46 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 8e3ffe3..4eaf755 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -36,15 +36,15 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import org.apache.gearpump.streaming.dsl.window.api.Window;
import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
import scala.collection.JavaConversions;
@@ -122,7 +122,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class GroupByFn<K, V> implements
+ private static class GroupByFn<K, V> extends
GroupByFunction<WindowedValue<KV<K, V>>, K> {
@Override
@@ -132,7 +132,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
private static class ValueToIterable<K, V>
- implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
+ extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
@Override
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
@@ -141,7 +141,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class MergeValue<K, V> implements
+ private static class MergeValue<K, V> extends
ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 24f9734..0d5b8bc 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -41,10 +42,10 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
/**
* {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
@@ -83,12 +84,13 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
/**
* Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
*/
- private static class DoFnMultiFunction<InputT, OutputT> implements
- FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
- DoFnRunners.OutputManager {
+ private static class DoFnMultiFunction<InputT, OutputT>
+ extends FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>
+ implements DoFnRunners.OutputManager {
private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
private DoFnRunner<InputT, OutputT> doFnRunner;
+ private final DoFn<InputT, OutputT> doFn;
private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
.newArrayList();
@@ -99,6 +101,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
TupleTagList sideOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
SideInputReader sideInputReader) {
+ this.doFn = doFn;
this.doFnRunnerFactory = new DoFnRunnerFactory<>(
pipelineOptions,
doFn,
@@ -113,6 +116,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
}
@Override
+ public void setup() {
+ DoFnInvokers.invokerFor(doFn).invokeSetup();
+ }
+
+ @Override
+ public void teardown() {
+ DoFnInvokers.invokerFor(doFn).invokeTeardown();
+ }
+
+ @Override
public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
if (null == doFnRunner) {
doFnRunner = doFnRunnerFactory.createRunner();
@@ -133,7 +146,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
}
}
- private static class FilterByOutputTag<OutputT> implements
+ private static class FilterByOutputTag<OutputT> extends
FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
private final TupleTag<OutputT> tupleTag;
@@ -148,7 +161,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
}
}
- private static class ExtractOutput<OutputT> implements
+ private static class ExtractOutput<OutputT> extends
MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 32dd5de..d3c50a5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -34,8 +34,8 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
import org.apache.gearpump.streaming.task.TaskContext;
import org.joda.time.Instant;
@@ -61,7 +61,7 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
context.setOutputStream(context.getOutput(transform), outputStream);
}
- private static class AssignWindows<T> implements
+ private static class AssignWindows<T> extends
FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
private final WindowFn<T, BoundedWindow> fn;
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 42969fe..a66d3a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -30,30 +30,33 @@ import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
/**
* Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
*/
-public class DoFnFunction<InputT, OutputT> implements
- FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
+public class DoFnFunction<InputT, OutputT> extends
+ FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>> implements
+ DoFnRunners.OutputManager {
private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
private DoFnRunner<InputT, OutputT> doFnRunner;
+ private final DoFn<InputT, OutputT> doFn;
public DoFnFunction(
GearpumpPipelineOptions pipelineOptions,
DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
SideInputReader sideInputReader) {
+ this.doFn = doFn;
this.doFnRunnerFactory = new DoFnRunnerFactory<>(
pipelineOptions,
doFn,
@@ -68,6 +71,16 @@ public class DoFnFunction<InputT, OutputT> implements
}
@Override
+ public void setup() {
+ DoFnInvokers.invokerFor(doFn).invokeSetup();
+ }
+
+ @Override
+ public void teardown() {
+ DoFnInvokers.invokerFor(doFn).invokeTeardown();
+ }
+
+ @Override
public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
outputs = Lists.newArrayList();