You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/03 16:39:32 UTC
[1/2] incubator-beam git commit: [BEAM-79] Port Gearpump runner from
OldDoFn to new DoFn
Repository: incubator-beam
Updated Branches:
refs/heads/gearpump-runner 3933b5577 -> 323ec1188
[BEAM-79] Port Gearpump runner from OldDoFn to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45570b9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45570b9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45570b9c
Branch: refs/heads/gearpump-runner
Commit: 45570b9c7ebb11080deca3346fc601c69796612a
Parents: 3933b55
Author: manuzhang <ow...@gmail.com>
Authored: Mon Oct 31 11:52:22 2016 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 3 09:38:41 2016 -0700
----------------------------------------------------------------------
.../gearpump/GearpumpPipelineTranslator.java | 2 +-
.../translators/ParDoBoundMultiTranslator.java | 17 +-
.../translators/ParDoBoundTranslator.java | 3 +-
.../translators/functions/DoFnFunction.java | 19 +-
.../translators/utils/DoFnRunnerFactory.java | 77 +++
.../translators/utils/GearpumpDoFnRunner.java | 516 -------------------
.../utils/NoOpAggregatorFactory.java | 41 ++
7 files changed, 143 insertions(+), 532 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 5045ae4..8588fff 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -108,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
@Override
public void visitValue(PValue value, TransformTreeNode producer) {
- LOG.info("visiting value {}", value);
+ LOG.debug("visiting value {}", value);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 2b49684..54f1c3f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -27,11 +27,11 @@ import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
@@ -64,7 +64,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap(
new DoFnMultiFunction<>(
context.getPipelineOptions(),
- transform.getFn(),
+ transform.getNewFn(),
transform.getMainOutputTag(),
transform.getSideOutputTags(),
inputT.getWindowingStrategy(),
@@ -87,18 +87,19 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>,
DoFnRunners.OutputManager {
- private final DoFnRunner<InputT, OutputT> doFnRunner;
+ private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
+ private DoFnRunner<InputT, OutputT> doFnRunner;
private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists
.newArrayList();
public DoFnMultiFunction(
GearpumpPipelineOptions pipelineOptions,
- OldDoFn<InputT, OutputT> doFn,
+ DoFn<InputT, OutputT> doFn,
TupleTag<OutputT> mainOutputTag,
TupleTagList sideOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
SideInputReader sideInputReader) {
- this.doFnRunner = new GearpumpDoFnRunner<>(
+ this.doFnRunnerFactory = new DoFnRunnerFactory<>(
pipelineOptions,
doFn,
sideInputReader,
@@ -106,12 +107,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
mainOutputTag,
sideOutputTags.getAll(),
new NoOpStepContext(),
+ new NoOpAggregatorFactory(),
windowingStrategy
);
}
@Override
public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
+ if (null == doFnRunner) {
+ doFnRunner = doFnRunnerFactory.createRunner();
+ }
doFnRunner.startBundle();
doFnRunner.processElement(wv);
doFnRunner.finishBundle();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index b97cbb4..a796c83 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.gearpump.translators;
import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -39,7 +38,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
- OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
PCollection<OutputT> output = context.getOutput(transform);
WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 8d16356..42969fe 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -26,10 +26,10 @@ import java.util.List;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner;
+import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
+import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -44,17 +44,17 @@ import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
public class DoFnFunction<InputT, OutputT> implements
FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager {
- private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {
- };
- private final DoFnRunner<InputT, OutputT> doFnRunner;
+ private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
+ private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
+ private DoFnRunner<InputT, OutputT> doFnRunner;
public DoFnFunction(
GearpumpPipelineOptions pipelineOptions,
- OldDoFn<InputT, OutputT> doFn,
+ DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
SideInputReader sideInputReader) {
- this.doFnRunner = new GearpumpDoFnRunner<>(
+ this.doFnRunnerFactory = new DoFnRunnerFactory<>(
pipelineOptions,
doFn,
sideInputReader,
@@ -62,6 +62,7 @@ public class DoFnFunction<InputT, OutputT> implements
mainTag,
TupleTagList.empty().getAll(),
new NoOpStepContext(),
+ new NoOpAggregatorFactory(),
windowingStrategy
);
}
@@ -70,6 +71,10 @@ public class DoFnFunction<InputT, OutputT> implements
public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
outputs = Lists.newArrayList();
+ if (null == doFnRunner) {
+ doFnRunner = doFnRunnerFactory.createRunner();
+ }
+
doFnRunner.startBundle();
doFnRunner.processElement(value);
doFnRunner.finishBundle();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
new file mode 100644
index 0000000..7119a87
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.utils;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * a serializable {@link SimpleDoFnRunner}.
+ */
+public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
+
+ private final DoFn<InputT, OutputT> fn;
+ private final transient PipelineOptions options;
+ private final SideInputReader sideInputReader;
+ private final DoFnRunners.OutputManager outputManager;
+ private final TupleTag<OutputT> mainOutputTag;
+ private final List<TupleTag<?>> sideOutputTags;
+ private final ExecutionContext.StepContext stepContext;
+ private final AggregatorFactory aggregatorFactory;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+
+ public DoFnRunnerFactory(
+ GearpumpPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ SideInputReader sideInputReader,
+ DoFnRunners.OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ ExecutionContext.StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ this.fn = doFn;
+ this.options = pipelineOptions;
+ this.sideInputReader = sideInputReader;
+ this.outputManager = outputManager;
+ this.mainOutputTag = mainOutputTag;
+ this.sideOutputTags = sideOutputTags;
+ this.stepContext = stepContext;
+ this.aggregatorFactory = aggregatorFactory;
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ public DoFnRunner<InputT, OutputT> createRunner() {
+ return DoFnRunners.createDefault(options, fn, sideInputReader, outputManager, mainOutputTag,
+ sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
deleted file mode 100644
index ec86a8d..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators.utils;
-
-import static org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.SimpleDoFnRunner;
-import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.joda.time.Instant;
-
-
-/**
- * a serializable {@link SimpleDoFnRunner}.
- */
-public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT>,
- Serializable {
-
- private final OldDoFn<InputT, OutputT> fn;
- private final transient PipelineOptions options;
- private final SideInputReader sideInputReader;
- private final DoFnRunners.OutputManager outputManager;
- private final TupleTag<OutputT> mainOutputTag;
- private final List<TupleTag<?>> sideOutputTags;
- private final ExecutionContext.StepContext stepContext;
- private final WindowFn<?, ?> windowFn;
- private DoFnContext<InputT, OutputT> context;
-
- public GearpumpDoFnRunner(
- GearpumpPipelineOptions pipelineOptions,
- OldDoFn<InputT, OutputT> doFn,
- SideInputReader sideInputReader,
- DoFnRunners.OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- ExecutionContext.StepContext stepContext,
- WindowingStrategy<?, ?> windowingStrategy) {
- this.fn = doFn;
- this.options = pipelineOptions;
- this.sideInputReader = sideInputReader;
- this.outputManager = outputManager;
- this.mainOutputTag = mainOutputTag;
- this.sideOutputTags = sideOutputTags;
- this.stepContext = stepContext;
- this.windowFn = windowingStrategy == null ? null : windowingStrategy.getWindowFn();
- }
-
- @Override
- public void startBundle() {
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- if (null == context) {
- this.context = new DoFnContext<>(
- options,
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- stepContext,
- windowFn
- );
- }
- fn.startBundle(context);
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
- }
- }
-
- @Override
- public void processElement(WindowedValue<InputT> elem) {
- if (elem.getWindows().size() <= 1
- || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
- && context.sideInputReader.isEmpty())) {
- invokeProcessElement(elem);
- } else {
- // We could modify the windowed value (and the processContext) to
- // avoid repeated allocations, but this is more straightforward.
- for (BoundedWindow window : elem.getWindows()) {
- invokeProcessElement(WindowedValue.of(
- elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
- }
- }
- }
-
- @Override
- public void finishBundle() {
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.finishBundle(context);
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
- }
- }
-
- private void invokeProcessElement(WindowedValue<InputT> elem) {
- final OldDoFn<InputT, OutputT>.ProcessContext processContext =
- new DoFnProcessContext<>(fn, context, elem);
- // This can contain user code. Wrap it in case it throws an exception.
- try {
- fn.processElement(processContext);
- } catch (Exception ex) {
- throw wrapUserCodeException(ex);
- }
- }
-
- private RuntimeException wrapUserCodeException(Throwable t) {
- throw UserCodeException.wrapIf(!isSystemDoFn(), t);
- }
-
- private boolean isSystemDoFn() {
- return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
- }
-
- /**
- * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
- *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
- */
- private static class DoFnContext<InputT, OutputT>
- extends OldDoFn<InputT, OutputT>.Context {
- private static final int MAX_SIDE_OUTPUTS = 1000;
-
- final transient PipelineOptions options;
- final OldDoFn<InputT, OutputT> fn;
- final SideInputReader sideInputReader;
- final DoFnRunners.OutputManager outputManager;
- final TupleTag<OutputT> mainOutputTag;
- final ExecutionContext.StepContext stepContext;
- final WindowFn<?, ?> windowFn;
-
- /**
- * The set of known output tags, some of which may be undeclared, so we can throw an
- * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
- */
- private final Set<TupleTag<?>> outputTags;
-
- public DoFnContext(PipelineOptions options,
- OldDoFn<InputT, OutputT> fn,
- SideInputReader sideInputReader,
- DoFnRunners.OutputManager outputManager,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- ExecutionContext.StepContext stepContext,
- WindowFn<?, ?> windowFn) {
- fn.super();
- this.options = options;
- this.fn = fn;
- this.sideInputReader = sideInputReader;
- this.outputManager = outputManager;
- this.mainOutputTag = mainOutputTag;
- this.outputTags = Sets.newHashSet();
-
- outputTags.add(mainOutputTag);
- for (TupleTag<?> sideOutputTag : sideOutputTags) {
- outputTags.add(sideOutputTag);
- }
-
- this.stepContext = stepContext;
- this.windowFn = windowFn;
- super.setupDelegateAggregators();
- }
-
- //////////////////////////////////////////////////////////////////////////////
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
- T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
- final Instant inputTimestamp = timestamp;
-
- if (timestamp == null) {
- timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- if (windows == null) {
- try {
- // The windowFn can never succeed at accessing the element, so its type does not
- // matter here
- @SuppressWarnings("unchecked")
- WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
- windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
- @Override
- public Object element() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input element when none was available");
- }
-
- @Override
- public Instant timestamp() {
- if (inputTimestamp == null) {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input timestamp when none was available");
- }
- return inputTimestamp;
- }
-
- @Override
- public BoundedWindow window() {
- throw new UnsupportedOperationException(
- "WindowFn attempted to access input windows when none were available");
- }
- });
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- return WindowedValue.of(output, timestamp, windows, pane);
- }
-
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- if (!sideInputReader.contains(view)) {
- throw new IllegalArgumentException("calling sideInput() with unknown view");
- }
- BoundedWindow sideInputWindow =
- view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
- return sideInputReader.get(view, sideInputWindow);
- }
-
- void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
- }
-
- void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
- outputManager.output(mainOutputTag, windowedElem);
- if (stepContext != null) {
- stepContext.noteOutput(windowedElem);
- }
- }
-
- protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
- T output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
- }
-
- protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
- if (!outputTags.contains(tag)) {
- // This tag wasn't declared nor was it seen before during this execution.
- // Thus, this must be a new, undeclared and unconsumed output.
- // To prevent likely user errors, enforce the limit on the number of side
- // outputs.
- if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
- throw new IllegalArgumentException(
- "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
- }
- outputTags.add(tag);
- }
-
- outputManager.output(tag, windowedElem);
- if (stepContext != null) {
- stepContext.noteSideOutput(tag, windowedElem);
- }
- }
-
- // Following implementations of output, outputWithTimestamp, and sideOutput
- // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
- // ProcessContext's versions in DoFn.processElement.
- @Override
- public void output(OutputT output) {
- outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
- sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
- sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(combiner,
- "Combiner passed to createAggregator cannot be null");
- throw new UnsupportedOperationException("aggregator not supported in Gearpump runner");
- }
- }
-
-
- /**
- * A concrete implementation of {@code DoFn.ProcessContext} used for
- * running a {@link DoFn} over a single element.
- *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
- */
- private static class DoFnProcessContext<InputT, OutputT>
- extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-
- final OldDoFn<InputT, OutputT> fn;
- final DoFnContext<InputT, OutputT> context;
- final WindowedValue<InputT> windowedValue;
-
- public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
- DoFnContext<InputT, OutputT> context,
- WindowedValue<InputT> windowedValue) {
- fn.super();
- this.fn = fn;
- this.context = context;
- this.windowedValue = windowedValue;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public InputT element() {
- return windowedValue.getValue();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- checkNotNull(view, "View passed to sideInput cannot be null");
- Iterator<? extends BoundedWindow> windowIter = windows().iterator();
- BoundedWindow window;
- if (!windowIter.hasNext()) {
- if (context.windowFn instanceof GlobalWindows) {
- // TODO: Remove this once GroupByKeyOnly no longer outputs elements
- // without windows
- window = GlobalWindow.INSTANCE;
- } else {
- throw new IllegalStateException(
- "sideInput called when main input element is not in any windows");
- }
- } else {
- window = windowIter.next();
- if (windowIter.hasNext()) {
- throw new IllegalStateException(
- "sideInput called when main input element is in multiple windows");
- }
- }
- return context.sideInput(view, window);
- }
-
- @Override
- public BoundedWindow window() {
- if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
- throw new UnsupportedOperationException(
- "window() is only available in the context of a DoFn marked as RequiresWindow.");
- }
- return Iterables.getOnlyElement(windows());
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public void output(OutputT output) {
- context.outputWindowedValue(windowedValue.withValue(output));
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWindowedValue(output, timestamp,
- windowedValue.getWindows(), windowedValue.getPane());
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- checkNotNull(tag, "Tag passed to sideOutput cannot be null");
- context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
- context.sideOutputWindowedValue(
- tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
- }
-
- @Override
- public Instant timestamp() {
- return windowedValue.getTimestamp();
- }
-
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return new WindowingInternals<InputT, OutputT>() {
- @Override
- public void outputWindowedValue(OutputT output, Instant timestamp,
- Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- context.outputWindowedValue(output, timestamp, windows, pane);
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public TimerInternals timerInternals() {
- return context.stepContext.timerInternals();
- }
-
- @Override
- public <T> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<T> elemCoder) throws IOException {
- @SuppressWarnings("unchecked")
- Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
-
- context.stepContext.writePCollectionViewData(
- tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
- window(), windowCoder);
- }
-
- @Override
- public StateInternals<?> stateInternals() {
- return context.stepContext.stateInternals();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
- return context.sideInput(view, mainInputWindow);
- }
- };
- }
-
- @Override
- protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
- createAggregatorInternal(
- String name, Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
new file mode 100644
index 0000000..cd404a5
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.utils;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.util.ExecutionContext;
+
+/**
+ * no-op aggregator factory.
+ */
+public class NoOpAggregatorFactory implements AggregatorFactory, Serializable {
+
+ @Override
+ public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+ Class<?> fnClass,
+ ExecutionContext.StepContext stepContext,
+ String aggregatorName,
+ Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+ return null;
+ }
+}
[2/2] incubator-beam git commit: This closes #1234
Posted by ke...@apache.org.
This closes #1234
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/323ec118
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/323ec118
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/323ec118
Branch: refs/heads/gearpump-runner
Commit: 323ec1188d2dffcdad640701e1827f90965994a8
Parents: 3933b55 45570b9
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Nov 3 09:39:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Nov 3 09:39:17 2016 -0700
----------------------------------------------------------------------
.../gearpump/GearpumpPipelineTranslator.java | 2 +-
.../translators/ParDoBoundMultiTranslator.java | 17 +-
.../translators/ParDoBoundTranslator.java | 3 +-
.../translators/functions/DoFnFunction.java | 19 +-
.../translators/utils/DoFnRunnerFactory.java | 77 +++
.../translators/utils/GearpumpDoFnRunner.java | 516 -------------------
.../utils/NoOpAggregatorFactory.java | 41 ++
7 files changed, 143 insertions(+), 532 deletions(-)
----------------------------------------------------------------------