You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/21 02:53:13 UTC
[5/7] incubator-beam git commit: Add SimpleDoFnRunner for new DoFn
and construction via DoFnRunners
Add SimpleDoFnRunner for new DoFn and construction via DoFnRunners
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fe0b7bff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fe0b7bff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fe0b7bff
Branch: refs/heads/master
Commit: fe0b7bff6bacbd4c5920dc547a014dd6d8d2eabb
Parents: 8ff6c9d
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 20:38:23 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 18:32:06 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/DoFnRunners.java | 120 +++-
.../beam/runners/core/SimpleDoFnRunner.java | 584 +++++++++++++++++++
2 files changed, 703 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fe0b7bff/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 13cd161..15f7b7e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -18,11 +18,12 @@
package org.apache.beam.runners.core;
import java.util.List;
-
import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ExecutionContext.StepContext;
@@ -48,6 +49,35 @@ public class DoFnRunners {
}
/**
+ * Returns an implementation of {@link DoFnRunner} that for a {@link DoFn}.
+ *
+ * <p>If the {@link DoFn} observes the window, this runner will explode the windows of a
+ * compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key
+ * partitioning needed, etc.
+ */
+ static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ return new SimpleDoFnRunner<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+ }
+
+ /**
* Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
*
* <p>It invokes {@link OldDoFn#processElement} for each input.
@@ -92,7 +122,45 @@ public class DoFnRunners {
droppedDueToLatenessAggregator);
}
+ /**
+ * Creates a {@link DoFnRunner} for the provided {@link DoFn}.
+ */
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> doFn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
+ // and window-exploded processing is achieved within the simple runner
+ return simpleRunner(
+ options,
+ doFn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+ }
+
+ /**
+ * Creates a {@link DoFnRunner} for the provided {@link OldDoFn}.
+ *
+ * <p>In particular, if the {@link OldDoFn} is a {@link ReduceFnExecutor}, a specialized
+ * implementation detail of streaming {@link GroupAlsoByWindow}, then it will create a special
+ * runner that operates on {@link KeyedWorkItem KeyedWorkItems}, drops late data and counts
+ * dropped elements.
+ *
+ * @deprecated please port uses of {@link OldDoFn} to use {@link DoFn}
+ */
+ @Deprecated
public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
PipelineOptions options,
OldDoFn<InputT, OutputT> doFn,
@@ -136,4 +204,54 @@ public class DoFnRunners {
return runner;
}
}
+
+ /**
+ * Creates the right kind of {@link DoFnRunner} for an object that can be either a {@link DoFn} or
+ * {@link OldDoFn}. This can be used so that the client need not explicitly reference either such
+ * class, but merely deserialize a payload and pass it to this method.
+ *
+ * @deprecated for migration purposes only for services where users may still submit either {@link
+ * OldDoFn} or {@link DoFn}. If you know that you have a {@link DoFn} then you should use the
+ * variant for that instead.
+ */
+ @Deprecated
+ public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
+ PipelineOptions options,
+ Object deserializedFn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ if (deserializedFn instanceof DoFn) {
+ return createDefault(
+ options,
+ (DoFn) deserializedFn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+ } else if (deserializedFn instanceof OldDoFn) {
+ return createDefault(
+ options,
+ (OldDoFn) deserializedFn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy);
+ } else {
+ throw new IllegalArgumentException(String.format("Cannot create %s for %s of class %s",
+ DoFnRunner.class.getSimpleName(),
+ deserializedFn,
+ deserializedFn.getClass()));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fe0b7bff/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
new file mode 100644
index 0000000..d5ecbc9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.InputProvider;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+/**
+ * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
+ *
+ * @param <InputT> the type of the {@link DoFn} (main) input elements
+ * @param <OutputT> the type of the {@link DoFn} (main) output elements
+ */
+public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+ /** The {@link DoFn} being run. */
+ private final DoFn<InputT, OutputT> fn;
+
+ /** The {@link DoFnInvoker} being run. */
+ private final DoFnInvoker<InputT, OutputT> invoker;
+
+ /** The context used for running the {@link DoFn}. */
+ private final DoFnContext<InputT, OutputT> context;
+
+ private final OutputManager outputManager;
+
+ private final TupleTag<OutputT> mainOutputTag;
+
+ private final boolean ignoresWindow;
+
+ public SimpleDoFnRunner(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowingStrategy<?, ?> windowingStrategy) {
+ this.fn = fn;
+ this.ignoresWindow =
+ !DoFnSignatures.INSTANCE.getSignature(fn.getClass()).processElement().usesSingleWindow();
+ this.invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ this.outputManager = outputManager;
+ this.mainOutputTag = mainOutputTag;
+ this.context =
+ new DoFnContext<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ aggregatorFactory,
+ windowingStrategy == null ? null : windowingStrategy.getWindowFn());
+ }
+
+ @Override
+ public void startBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ invoker.invokeStartBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> compressedElem) {
+ if (ignoresWindow) {
+ invokeProcessElement(compressedElem);
+ } else {
+ for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
+ invokeProcessElement(elem);
+ }
+ }
+ }
+
+ private void invokeProcessElement(WindowedValue<InputT> elem) {
+ final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+
+ // Note that if the element must be exploded into all its windows, that has to be done outside
+ // of this runner.
+ final DoFn.ExtraContextFactory<InputT, OutputT> extraContextFactory =
+ createExtraContextFactory(elem);
+
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ invoker.invokeProcessElement(processContext, extraContextFactory);
+ } catch (Exception ex) {
+ throw wrapUserCodeException(ex);
+ }
+ }
+
+ @Override
+ public void finishBundle() {
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ invoker.invokeFinishBundle(context);
+ } catch (Throwable t) {
+ // Exception in user code.
+ throw wrapUserCodeException(t);
+ }
+ }
+
+ /** Returns a new {@link DoFn.ProcessContext} for the given element. */
+ private DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
+ return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+ }
+
+ private DoFn.ExtraContextFactory<InputT, OutputT> createExtraContextFactory(
+ WindowedValue<InputT> elem) {
+ return new DoFnExtraContextFactory<InputT, OutputT>(elem.getWindows(), elem.getPane());
+ }
+
+ private RuntimeException wrapUserCodeException(Throwable t) {
+ throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+ }
+
+ private boolean isSystemDoFn() {
+ return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+ }
+
+ /**
+ * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
+ *
+ * @param <InputT> the type of the {@link DoFn} (main) input elements
+ * @param <OutputT> the type of the {@link DoFn} (main) output elements
+ */
+ private static class DoFnContext<InputT, OutputT> extends DoFn<InputT, OutputT>.Context {
+ private static final int MAX_SIDE_OUTPUTS = 1000;
+
+ final PipelineOptions options;
+ final DoFn<InputT, OutputT> fn;
+ final SideInputReader sideInputReader;
+ final OutputManager outputManager;
+ final TupleTag<OutputT> mainOutputTag;
+ final StepContext stepContext;
+ final AggregatorFactory aggregatorFactory;
+ final WindowFn<?, ?> windowFn;
+
+ /**
+ * The set of known output tags, some of which may be undeclared, so we can throw an exception
+ * when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+ */
+ private Set<TupleTag<?>> outputTags;
+
+ public DoFnContext(
+ PipelineOptions options,
+ DoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ StepContext stepContext,
+ AggregatorFactory aggregatorFactory,
+ WindowFn<?, ?> windowFn) {
+ fn.super();
+ this.options = options;
+ this.fn = fn;
+ this.sideInputReader = sideInputReader;
+ this.outputManager = outputManager;
+ this.mainOutputTag = mainOutputTag;
+ this.outputTags = Sets.newHashSet();
+
+ outputTags.add(mainOutputTag);
+ for (TupleTag<?> sideOutputTag : sideOutputTags) {
+ outputTags.add(sideOutputTag);
+ }
+
+ this.stepContext = stepContext;
+ this.aggregatorFactory = aggregatorFactory;
+ this.windowFn = windowFn;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
+ T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
+ final Instant inputTimestamp = timestamp;
+
+ if (timestamp == null) {
+ timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ if (windows == null) {
+ try {
+ // The windowFn can never succeed at accessing the element, so its type does not
+ // matter here
+ @SuppressWarnings("unchecked")
+ WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
+ windows =
+ objectWindowFn.assignWindows(
+ objectWindowFn.new AssignContext() {
+ @Override
+ public Object element() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input element when none was available");
+ }
+
+ @Override
+ public Instant timestamp() {
+ if (inputTimestamp == null) {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input timestamp when none was available");
+ }
+ return inputTimestamp;
+ }
+
+ @Override
+ public W window() {
+ throw new UnsupportedOperationException(
+ "WindowFn attempted to access input windows when none were available");
+ }
+ });
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ return WindowedValue.of(output, timestamp, windows, pane);
+ }
+
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ if (!sideInputReader.contains(view)) {
+ throw new IllegalArgumentException("calling sideInput() with unknown view");
+ }
+ BoundedWindow sideInputWindow =
+ view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+ return sideInputReader.get(view, sideInputWindow);
+ }
+
+ void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
+ outputManager.output(mainOutputTag, windowedElem);
+ if (stepContext != null) {
+ stepContext.noteOutput(windowedElem);
+ }
+ }
+
+ private <T> void sideOutputWindowedValue(
+ TupleTag<T> tag,
+ T output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+ }
+
+ private <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+ if (!outputTags.contains(tag)) {
+ // This tag wasn't declared nor was it seen before during this execution.
+ // Thus, this must be a new, undeclared and unconsumed output.
+ // To prevent likely user errors, enforce the limit on the number of side
+ // outputs.
+ if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
+ throw new IllegalArgumentException(
+ "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+ }
+ outputTags.add(tag);
+ }
+
+ outputManager.output(tag, windowedElem);
+ if (stepContext != null) {
+ stepContext.noteSideOutput(tag, windowedElem);
+ }
+ }
+
+ // Following implementations of output, outputWithTimestamp, and sideOutput
+ // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
+ // ProcessContext's versions in DoFn.processElement.
+ @Override
+ public void output(OutputT output) {
+ outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+ sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
+ sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ checkNotNull(combiner, "Combiner passed to createAggregator cannot be null");
+ return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
+ }
+ }
+
+ /**
+ * A concrete implementation of {@link DoFn.ProcessContext} used for running a {@link DoFn} over a
+ * single element.
+ *
+ * @param <InputT> the type of the {@link DoFn} (main) input elements
+ * @param <OutputT> the type of the {@link DoFn} (main) output elements
+ */
+ private static class DoFnProcessContext<InputT, OutputT>
+ extends DoFn<InputT, OutputT>.ProcessContext {
+
+ final DoFn<InputT, OutputT> fn;
+ final DoFnContext<InputT, OutputT> context;
+ final WindowedValue<InputT> windowedValue;
+
+ public DoFnProcessContext(
+ DoFn<InputT, OutputT> fn,
+ DoFnContext<InputT, OutputT> context,
+ WindowedValue<InputT> windowedValue) {
+ fn.super();
+ this.fn = fn;
+ this.context = context;
+ this.windowedValue = windowedValue;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public InputT element() {
+ return windowedValue.getValue();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ Iterator<? extends BoundedWindow> windowIter = windows().iterator();
+ BoundedWindow window;
+ if (!windowIter.hasNext()) {
+ if (context.windowFn instanceof GlobalWindows) {
+ // TODO: Remove this once GroupByKeyOnly no longer outputs elements
+ // without windows
+ window = GlobalWindow.INSTANCE;
+ } else {
+ throw new IllegalStateException(
+ "sideInput called when main input element is not in any windows");
+ }
+ } else {
+ window = windowIter.next();
+ if (windowIter.hasNext()) {
+ throw new IllegalStateException(
+ "sideInput called when main input element is in multiple windows");
+ }
+ }
+ return context.sideInput(view, window);
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return windowedValue.getPane();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.outputWindowedValue(windowedValue.withValue(output));
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ checkTimestamp(timestamp);
+ context.outputWindowedValue(
+ output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+ }
+
+ void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ context.outputWindowedValue(output, timestamp, windows, pane);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+ context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+ checkTimestamp(timestamp);
+ context.sideOutputWindowedValue(
+ tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+ }
+
+ @Override
+ public Instant timestamp() {
+ return windowedValue.getTimestamp();
+ }
+
+ public Collection<? extends BoundedWindow> windows() {
+ return windowedValue.getWindows();
+ }
+
+ private void checkTimestamp(Instant timestamp) {
+ if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+ + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+ + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed "
+ + "skew.",
+ timestamp,
+ windowedValue.getTimestamp(),
+ PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+ }
+ }
+
+ @Override
+ protected <AggregatorInputT, AggregatorOutputT>
+ Aggregator<AggregatorInputT, AggregatorOutputT> createAggregator(
+ String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
+ return context.createAggregator(name, combiner);
+ }
+ }
+
+ private class DoFnExtraContextFactory<InputT, OutputT>
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
+
+ /** The windows of the current element. */
+ private final Collection<? extends BoundedWindow> windows;
+
+ /** The pane of the current element. */
+ private final PaneInfo pane;
+
+ public DoFnExtraContextFactory(Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ this.windows = windows;
+ this.pane = pane;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return null;
+ }
+
+ @Override
+ public InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("InputProvider parameters are not supported.");
+ }
+
+ @Override
+ public OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("OutputReceiver parameters are not supported.");
+ }
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return new WindowingInternals<InputT, OutputT>() {
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return windows;
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return pane;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return context.stepContext.timerInternals();
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(
+ TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder)
+ throws IOException {
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
+
+ context.stepContext.writePCollectionViewData(
+ tag,
+ data,
+ IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
+ window(),
+ windowCoder);
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return context.stepContext.stateInternals();
+ }
+
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {}
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+ return context.sideInput(view, mainInputWindow);
+ }
+ };
+ }
+ }
+}