You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 16:40:56 UTC
[06/50] [abbrv] incubator-beam git commit: Improve Splittable DoFn
Improve Splittable DoFn
Makes Splittable DoFn be more like a real DoFn:
- Adds support for side inputs and outputs to SDF
- Teaches `ProcessFn` to work with exploded windows inside the
`KeyedWorkItem`. It works with them by un-exploding the windows
in the `Iterable<WindowedValue<ElementAndRestriction>>` into a
single `WindowedValue`, since the values and timestamps are
guaranteed to be the same.
Makes SplittableParDo.ProcessFn not use the (now unavailable)
OldDoFn state and timers API:
- Makes `ProcessFn` be a primitive transform with its own
`ParDoEvaluator`. As a nice side effect, this enables the runner to
provide additional hooks into it - e.g. for giving the runner access
to the restriction tracker (in later PRs)
- For consistency, moves declaration of `GBKIntoKeyedWorkItems`
primitive transform into `SplittableParDo`, alongside the
`SplittableProcessElements` transform
- Preserves compressed representation of `WindowedValue`'s in
`PushbackSideInputDoFnRunner`
- Uses OutputWindowedValue in SplittableParDo.ProcessFn
Proper lifecycle management for wrapped fn.
- Caches underlying fn using DoFnLifecycleManager, so its
@Setup and @Teardown methods are called.
- Calls @StartBundle and @FinishBundle methods on the underlying
fn explicitly. Output from them is prohibited, since an SDF
is only allowed to output after a successful RestrictionTracker.tryClaim.
It's possible that an SDF should not be allowed to have
StartBundle/FinishBundle methods at all, but I'm not sure.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87ff5ac3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87ff5ac3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87ff5ac3
Branch: refs/heads/gearpump-runner
Commit: 87ff5ac36bb9cc62fa4864ffa7b5a5e495b9a4a1
Parents: fd4b631
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Oct 26 16:05:01 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Dec 1 14:15:55 2016 -0800
----------------------------------------------------------------------
.../core/ElementAndRestrictionCoder.java | 8 +
.../runners/core/GBKIntoKeyedWorkItems.java | 55 ---
.../beam/runners/core/SplittableParDo.java | 378 +++++++++++++++----
.../beam/runners/core/SplittableParDoTest.java | 134 +++++--
...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 41 +-
.../beam/runners/direct/DirectGroupByKey.java | 2 +-
.../beam/runners/direct/DirectRunner.java | 8 +-
.../runners/direct/DoFnLifecycleManager.java | 4 +-
.../beam/runners/direct/ParDoEvaluator.java | 26 +-
.../runners/direct/ParDoEvaluatorFactory.java | 63 +++-
.../direct/ParDoMultiOverrideFactory.java | 2 +-
...littableProcessElementsEvaluatorFactory.java | 144 +++++++
.../direct/TransformEvaluatorRegistry.java | 5 +
.../beam/runners/direct/SplittableDoFnTest.java | 194 +++++++++-
.../org/apache/beam/sdk/transforms/DoFn.java | 12 +
.../apache/beam/sdk/transforms/DoFnTester.java | 51 ++-
.../sdk/util/state/TimerInternalsFactory.java | 36 ++
17 files changed, 905 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 6dec8e2..64c1e14 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -64,4 +64,12 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
RestrictionT value = restrictionCoder.decode(inStream, context);
return ElementAndRestriction.of(key, value);
}
+
+ public Coder<ElementT> getElementCoder() {
+ return elementCoder;
+ }
+
+ public Coder<RestrictionT> getRestrictionCoder() {
+ return restrictionCoder;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
deleted file mode 100644
index 304e349..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Interface for creating a runner-specific {@link GroupByKey GroupByKey-like} {@link PTransform}
- * that produces {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state
- * and timers.
- */
-@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-public class GBKIntoKeyedWorkItems<KeyT, InputT>
- extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
- @Override
- public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
- checkArgument(input.getCoder() instanceof KvCoder,
- "Expected input coder to be KvCoder, but was %s",
- input.getCoder().getClass().getSimpleName());
-
- KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
- Coder<KeyedWorkItem<KeyT, InputT>> coder = KeyedWorkItemCoder.of(
- kvCoder.getKeyCoder(), kvCoder.getValueCoder(),
- input.getWindowingStrategy().getWindowFn().windowCoder());
- PCollection<KeyedWorkItem<KeyT, InputT>> collection = PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- collection.setCoder((Coder) coder);
- return collection;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index c38ab2f..80fd17b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -19,17 +19,22 @@ package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import java.util.List;
import java.util.UUID;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -45,21 +50,30 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypedPValue;
import org.joda.time.Instant;
/**
@@ -80,31 +94,53 @@ import org.joda.time.Instant;
* ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers.
*/
@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-public class SplittableParDo<
- InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
- extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
- private final DoFn<InputT, OutputT> fn;
- private final DoFnSignature signature;
+public class SplittableParDo<InputT, OutputT, RestrictionT>
+ extends PTransform<PCollection<InputT>, PCollectionTuple> {
+ private final ParDo.BoundMulti<InputT, OutputT> parDo;
/**
- * Creates the transform for the given original {@link ParDo} and {@link DoFn}.
+ * Creates the transform for the given original multi-output {@link ParDo}.
*
- * @param fn The splittable {@link DoFn} inside the original {@link ParDo} transform.
+ * @param parDo The splittable {@link ParDo} transform.
*/
- public SplittableParDo(DoFn<InputT, OutputT> fn) {
- checkNotNull(fn, "fn must not be null");
- this.fn = fn;
- this.signature = DoFnSignatures.getSignature(fn.getClass());
- checkArgument(signature.processElement().isSplittable(), "fn must be a splittable DoFn");
+ public SplittableParDo(ParDo.BoundMulti<InputT, OutputT> parDo) {
+ checkNotNull(parDo, "parDo must not be null");
+ this.parDo = parDo;
+ checkArgument(
+ DoFnSignatures.getSignature(parDo.getNewFn().getClass()).processElement().isSplittable(),
+ "fn must be a splittable DoFn");
}
@Override
- public PCollection<OutputT> apply(PCollection<InputT> input) {
- PCollection.IsBounded isFnBounded = signature.isBoundedPerElement();
+ public PCollectionTuple apply(PCollection<InputT> input) {
+ return applyTyped(input);
+ }
+
+ private PCollectionTuple applyTyped(PCollection<InputT> input) {
+ DoFn<InputT, OutputT> fn = parDo.getNewFn();
Coder<RestrictionT> restrictionCoder =
- DoFnInvokers
- .invokerFor(fn)
+ DoFnInvokers.invokerFor(fn)
.invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
+ PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> keyedWorkItems =
+ applySplitIntoKeyedWorkItems(input, fn, restrictionCoder);
+ return keyedWorkItems.apply(
+ "Process",
+ new ProcessElements<>(
+ fn,
+ input.getCoder(),
+ restrictionCoder,
+ input.getWindowingStrategy(),
+ parDo.getSideInputs(),
+ parDo.getMainOutputTag(),
+ parDo.getSideOutputTags()));
+ }
+
+ private static <InputT, OutputT, RestrictionT>
+ PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ applySplitIntoKeyedWorkItems(
+ PCollection<InputT> input,
+ DoFn<InputT, OutputT> fn,
+ Coder<RestrictionT> restrictionCoder) {
Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
@@ -121,23 +157,133 @@ public class SplittableParDo<
WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
.apply(
"Group by key",
- new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>());
+ new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>())
+ .setCoder(
+ KeyedWorkItemCoder.of(
+ StringUtf8Coder.of(),
+ splitCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder()));
checkArgument(
keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof GlobalWindows,
"GBKIntoKeyedWorkItems must produce a globally windowed collection, "
+ "but windowing strategy was: %s",
keyedWorkItems.getWindowingStrategy());
- return keyedWorkItems
- .apply(
- "Process",
- ParDo.of(
- new ProcessFn<InputT, OutputT, RestrictionT, TrackerT>(
- fn,
- input.getCoder(),
- restrictionCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder())))
- .setIsBoundedInternal(input.isBounded().and(isFnBounded))
- .setWindowingStrategyInternal(input.getWindowingStrategy());
+ return keyedWorkItems;
+ }
+
+ /**
+ * Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces
+ * {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers.
+ *
+ * <p>Unlike a real {@link GroupByKey}, ignores the input's windowing and triggering strategy and
+ * emits output immediately.
+ */
+ public static class GBKIntoKeyedWorkItems<KeyT, InputT>
+ extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+ @Override
+ public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded());
+ }
+ }
+
+ /**
+ * Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement}
+ * method for a splittable {@link DoFn}.
+ */
+ public static class ProcessElements<InputT, OutputT, RestrictionT>
+ extends PTransform<
+ PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>,
+ PCollectionTuple> {
+ private final DoFn<InputT, OutputT> fn;
+ private final Coder<InputT> elementCoder;
+ private final Coder<RestrictionT> restrictionCoder;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final List<PCollectionView<?>> sideInputs;
+ private final TupleTag<OutputT> mainOutputTag;
+ private final TupleTagList sideOutputTags;
+
+ /**
+ * @param fn the splittable {@link DoFn}.
+ * @param windowingStrategy the {@link WindowingStrategy} of the input collection.
+ * @param sideInputs list of side inputs that should be available to the {@link DoFn}.
+ * @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
+ * @param sideOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} side outputs.
+ */
+ public ProcessElements(
+ DoFn<InputT, OutputT> fn,
+ Coder<InputT> elementCoder,
+ Coder<RestrictionT> restrictionCoder,
+ WindowingStrategy<?, ?> windowingStrategy,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ TupleTagList sideOutputTags) {
+ this.fn = fn;
+ this.elementCoder = elementCoder;
+ this.restrictionCoder = restrictionCoder;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputs = sideInputs;
+ this.mainOutputTag = mainOutputTag;
+ this.sideOutputTags = sideOutputTags;
+ }
+
+ public DoFn<InputT, OutputT> getFn() {
+ return fn;
+ }
+
+ public List<PCollectionView<?>> getSideInputs() {
+ return sideInputs;
+ }
+
+ public TupleTag<OutputT> getMainOutputTag() {
+ return mainOutputTag;
+ }
+
+ public TupleTagList getSideOutputTags() {
+ return sideOutputTags;
+ }
+
+ public ProcessFn<InputT, OutputT, RestrictionT, ?> newProcessFn(DoFn<InputT, OutputT> fn) {
+ return new SplittableParDo.ProcessFn<>(
+ fn, elementCoder, restrictionCoder, windowingStrategy.getWindowFn().windowCoder());
+ }
+
+ @Override
+ public PCollectionTuple apply(
+ PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ input) {
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+ PCollectionTuple outputs =
+ PCollectionTuple.ofPrimitiveOutputsInternal(
+ input.getPipeline(),
+ TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()),
+ windowingStrategy,
+ input.isBounded().and(signature.isBoundedPerElement()));
+
+ // Set output type descriptor similarly to how ParDo.BoundMulti does it.
+ outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor());
+
+ return outputs;
+ }
+
+ @Override
+ public <T> Coder<T> getDefaultOutputCoder(
+ PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ input,
+ TypedPValue<T> output)
+ throws CannotProvideCoderException {
+ // Similar logic to ParDo.BoundMulti.getDefaultOutputCoder.
+ @SuppressWarnings("unchecked")
+ KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder =
+ (KeyedWorkItemCoder) input.getCoder();
+ Coder<InputT> inputCoder =
+ ((ElementAndRestrictionCoder<InputT, RestrictionT>) kwiCoder.getElementCoder())
+ .getElementCoder();
+ return input
+ .getPipeline()
+ .getCoderRegistry()
+ .getDefaultCoder(output.getTypeDescriptor(), fn.getInputTypeDescriptor(), inputCoder);
+ }
}
/**
@@ -182,15 +328,11 @@ public class SplittableParDo<
* The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
* by creating a tracker for the restriction and checkpointing/resuming processing later if
* necessary.
- *
- * <p>TODO: This uses deprecated OldDoFn since DoFn does not provide access to state/timer
- * internals. This should be rewritten to use the <a href="https://s.apache.org/beam-state">State
- * and Timers API</a> once it is available.
*/
@VisibleForTesting
- static class ProcessFn<
+ public static class ProcessFn<
InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
- extends OldDoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+ extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
// Commit at least once every 10k output records. This keeps the watermark advancing
// smoothly, and ensures that not too much work will have to be reprocessed in the event of
// a crash.
@@ -227,30 +369,56 @@ public class SplittableParDo<
*/
private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
+ private transient StateInternalsFactory<String> stateInternalsFactory;
+ private transient TimerInternalsFactory<String> timerInternalsFactory;
+ private transient OutputWindowedValue<OutputT> outputWindowedValue;
+
private final DoFn<InputT, OutputT> fn;
private final Coder<? extends BoundedWindow> windowCoder;
private transient DoFnInvoker<InputT, OutputT> invoker;
- ProcessFn(
+ public ProcessFn(
DoFn<InputT, OutputT> fn,
Coder<InputT> elementCoder,
Coder<RestrictionT> restrictionCoder,
Coder<? extends BoundedWindow> windowCoder) {
this.fn = fn;
+ this.invoker = DoFnInvokers.invokerFor(fn);
this.windowCoder = windowCoder;
- elementTag =
+ this.elementTag =
StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
- restrictionTag = StateTags.value("restriction", restrictionCoder);
+ this.restrictionTag = StateTags.value("restriction", restrictionCoder);
}
- @Override
- public void setup() throws Exception {
- invoker = DoFnInvokers.invokerFor(fn);
+ public void setStateInternalsFactory(StateInternalsFactory<String> stateInternalsFactory) {
+ this.stateInternalsFactory = stateInternalsFactory;
}
- @Override
+ public void setTimerInternalsFactory(TimerInternalsFactory<String> timerInternalsFactory) {
+ this.timerInternalsFactory = timerInternalsFactory;
+ }
+
+ public void setOutputWindowedValue(OutputWindowedValue<OutputT> outputWindowedValue) {
+ this.outputWindowedValue = outputWindowedValue;
+ }
+
+ @StartBundle
+ public void startBundle(Context c) throws Exception {
+ invoker.invokeStartBundle(wrapContext(c));
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ invoker.invokeFinishBundle(wrapContext(c));
+ }
+
+ @ProcessElement
public void processElement(final ProcessContext c) {
+ StateInternals<String> stateInternals =
+ stateInternalsFactory.stateInternalsForKey(c.element().key());
+ TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(c.element().key());
+
// Initialize state (element and restriction) depending on whether this is the seed call.
// The seed call is the first call for this element, which actually has the element.
// Subsequent calls are timer firings and the element has to be retrieved from the state.
@@ -258,17 +426,23 @@ public class SplittableParDo<
boolean isSeedCall = (timer == null);
StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
ValueState<WindowedValue<InputT>> elementState =
- c.windowingInternals().stateInternals().state(stateNamespace, elementTag);
+ stateInternals.state(stateNamespace, elementTag);
ValueState<RestrictionT> restrictionState =
- c.windowingInternals().stateInternals().state(stateNamespace, restrictionTag);
+ stateInternals.state(stateNamespace, restrictionTag);
WatermarkHoldState<GlobalWindow> holdState =
- c.windowingInternals().stateInternals().state(stateNamespace, watermarkHoldTag);
+ stateInternals.state(stateNamespace, watermarkHoldTag);
ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
if (isSeedCall) {
// The element and restriction are available in c.element().
+ // elementsIterable() will, by construction of SplittableParDo, contain the same value
+ // potentially in several different windows. We implode this into a single WindowedValue
+ // in order to simplify the rest of the code and avoid iterating over elementsIterable()
+ // explicitly. The windows of this WindowedValue will be propagated to windows of the
+ // output. This is correct because a splittable DoFn is not allowed to inspect the window
+ // of its element.
WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
- Iterables.getOnlyElement(c.element().elementsIterable());
+ implodeWindows(c.element().elementsIterable());
WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
elementState.write(element);
elementAndRestriction =
@@ -290,7 +464,7 @@ public class SplittableParDo<
DoFn.ProcessContinuation cont =
invoker.invokeProcessElement(
wrapTracker(
- tracker, makeContext(c, elementAndRestriction.element(), tracker, residual)));
+ tracker, wrapContext(c, elementAndRestriction.element(), tracker, residual)));
if (residual[0] == null) {
// This means the call completed unsolicited, and the context produced by makeContext()
// did not take a checkpoint. Take one now.
@@ -307,19 +481,85 @@ public class SplittableParDo<
}
restrictionState.write(residual[0]);
Instant futureOutputWatermark = cont.getWatermark();
- if (futureOutputWatermark != null) {
- holdState.add(futureOutputWatermark);
+ if (futureOutputWatermark == null) {
+ futureOutputWatermark = elementAndRestriction.element().getTimestamp();
}
+ Instant wakeupTime = timerInternals.currentProcessingTime().plus(cont.resumeDelay());
+ holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
- TimerInternals timerInternals = c.windowingInternals().timerInternals();
timerInternals.setTimer(
- TimerInternals.TimerData.of(
- stateNamespace,
- timerInternals.currentProcessingTime().plus(cont.resumeDelay()),
- TimeDomain.PROCESSING_TIME));
+ TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
+ }
+
+ /**
+ * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link
+ * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies
+ * of the same value with the same timestamp, but different window sets.
+ *
+ * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique
+ * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s
+ * for the same key, that means only that the windows of that {@link ElementAndRestriction} are
+ * being delivered separately rather than all at once. It is also legal to do because splittable
+ * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full
+ * set of windows of its input to its output.
+ */
+ private static <InputT, RestrictionT>
+ WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows(
+ Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) {
+ WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first =
+ Iterables.getFirst(values, null);
+ checkState(first != null, "Got a KeyedWorkItem with no elements and no timers");
+ ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder();
+ for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) {
+ windows.addAll(value.getWindows());
+ }
+ return WindowedValue.of(
+ first.getValue(), first.getTimestamp(), windows.build(), first.getPane());
+ }
+
+ private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) {
+ return fn.new Context() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return baseContext.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ throwUnsupportedOutput();
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ throwUnsupportedOutput();
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ throwUnsupportedOutput();
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ throwUnsupportedOutput();
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return fn.createAggregator(name, combiner);
+ }
+
+ private void throwUnsupportedOutput() {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Splittable DoFn can only output from @%s",
+ ProcessElement.class.getSimpleName()));
+ }
+ };
}
- private DoFn<InputT, OutputT>.ProcessContext makeContext(
+ private DoFn<InputT, OutputT>.ProcessContext wrapContext(
final ProcessContext baseContext,
final WindowedValue<InputT> element,
final TrackerT tracker,
@@ -340,17 +580,14 @@ public class SplittableParDo<
}
public void output(OutputT output) {
- baseContext
- .windowingInternals()
- .outputWindowedValue(
- output, element.getTimestamp(), element.getWindows(), element.getPane());
+ outputWindowedValue.outputWindowedValue(
+ output, element.getTimestamp(), element.getWindows(), element.getPane());
noteOutput();
}
public void outputWithTimestamp(OutputT output, Instant timestamp) {
- baseContext
- .windowingInternals()
- .outputWindowedValue(output, timestamp, element.getWindows(), element.getPane());
+ outputWindowedValue.outputWindowedValue(
+ output, timestamp, element.getWindows(), element.getPane());
noteOutput();
}
@@ -370,17 +607,15 @@ public class SplittableParDo<
}
public <T> void sideOutput(TupleTag<T> tag, T output) {
- // TODO: I'm not sure how to implement this correctly: there's no
- // "internals.sideOutputWindowedValue".
- throw new UnsupportedOperationException(
- "Side outputs not yet supported by splittable DoFn");
+ outputWindowedValue.sideOutputWindowedValue(
+ tag, output, element.getTimestamp(), element.getWindows(), element.getPane());
+ noteOutput();
}
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- // TODO: I'm not sure how to implement this correctly: there's no
- // "internals.sideOutputWindowedValue".
- throw new UnsupportedOperationException(
- "Side outputs not yet supported by splittable DoFn");
+ outputWindowedValue.sideOutputWindowedValue(
+ tag, output, timestamp, element.getWindows(), element.getPane());
+ noteOutput();
}
@Override
@@ -393,8 +628,7 @@ public class SplittableParDo<
/**
* Creates an {@link DoFnInvoker.ArgumentProvider} that provides the given tracker as well as
- * the given
- * {@link ProcessContext} (which is also provided when a {@link Context} is requested.
+ * the given {@link ProcessContext} (which is also provided when a {@link Context} is requested.
*/
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker(
TrackerT tracker, DoFn<InputT, OutputT>.ProcessContext processContext) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 29ff838..990d892 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertTrue;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -38,6 +39,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -47,8 +49,13 @@ import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
@@ -120,6 +127,12 @@ public class SplittableParDoTest {
.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
}
+ private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {};
+
+ private ParDo.BoundMulti<Integer, String> makeParDo(DoFn<Integer, String> fn) {
+ return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
+ }
+
@Test
public void testBoundednessForBoundedFn() {
Pipeline pipeline = TestPipeline.create();
@@ -128,14 +141,15 @@ public class SplittableParDoTest {
"Applying a bounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.BOUNDED,
makeBoundedCollection(pipeline)
- .apply("bounded to bounded", new SplittableParDo<>(boundedFn))
- .isBounded());
+ .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn)))
+ .get(MAIN_OUTPUT_TAG).isBounded());
assertEquals(
"Applying a bounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
- .apply("bounded to unbounded", new SplittableParDo<>(boundedFn))
- .isBounded());
+ .apply(
+ "bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn)))
+ .get(MAIN_OUTPUT_TAG).isBounded());
}
@Test
@@ -146,18 +160,27 @@ public class SplittableParDoTest {
"Applying an unbounded SDF to a bounded collection produces a bounded collection",
PCollection.IsBounded.UNBOUNDED,
makeBoundedCollection(pipeline)
- .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn))
- .isBounded());
+ .apply(
+ "unbounded to bounded",
+ new SplittableParDo<>(makeParDo(unboundedFn)))
+ .get(MAIN_OUTPUT_TAG).isBounded());
assertEquals(
"Applying an unbounded SDF to an unbounded collection produces an unbounded collection",
PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
- .apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn))
- .isBounded());
+ .apply(
+ "unbounded to unbounded",
+ new SplittableParDo<>(makeParDo(unboundedFn)))
+ .get(MAIN_OUTPUT_TAG).isBounded());
}
// ------------------------------- Tests for ProcessFn ---------------------------------
+ enum WindowExplosion {
+ EXPLODE_WINDOWS,
+ DO_NOT_EXPLODE_WINDOWS
+ }
+
/**
* A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
* {@link DoFn.ProcessElement} calls).
@@ -179,6 +202,46 @@ public class SplittableParDoTest {
new SplittableParDo.ProcessFn<>(
fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
this.tester = DoFnTester.of(processFn);
+ processFn.setStateInternalsFactory(
+ new StateInternalsFactory<String>() {
+ @Override
+ public StateInternals<String> stateInternalsForKey(String key) {
+ return tester.getStateInternals();
+ }
+ });
+ processFn.setTimerInternalsFactory(
+ new TimerInternalsFactory<String>() {
+ @Override
+ public TimerInternals timerInternalsForKey(String key) {
+ return tester.getTimerInternals();
+ }
+ });
+ processFn.setOutputWindowedValue(
+ new OutputWindowedValue<OutputT>() {
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ tester
+ .getMutableOutput(tester.getMainOutputTag())
+ .add(WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ tester.getMutableOutput(tag).add(WindowedValue.of(output, timestamp, windows, pane));
+ }
+ });
+ // Do not clone since ProcessFn references non-serializable DoFnTester itself
+ // through the state/timer/output callbacks.
+ this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
this.tester.startBundle();
this.tester.advanceProcessingTime(currentProcessingTime);
@@ -192,12 +255,24 @@ public class SplittableParDoTest {
ElementAndRestriction.of(element, restriction),
currentProcessingTime,
GlobalWindow.INSTANCE,
- PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ WindowExplosion.DO_NOT_EXPLODE_WINDOWS);
}
- void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
+ void startElement(
+ WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue,
+ WindowExplosion explosion)
throws Exception {
- tester.processElement(KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
+ switch (explosion) {
+ case EXPLODE_WINDOWS:
+ tester.processElement(
+ KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows()));
+ break;
+ case DO_NOT_EXPLODE_WINDOWS:
+ tester.processElement(
+ KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
+ break;
+ }
}
/**
@@ -253,9 +328,6 @@ public class SplittableParDoTest {
DoFn<Integer, String> fn = new ToStringFn();
Instant base = Instant.now();
- ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
- new ProcessFnTester<>(
- base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
IntervalWindow w1 =
new IntervalWindow(
@@ -267,20 +339,26 @@ public class SplittableParDoTest {
new IntervalWindow(
base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
- tester.startElement(
- WindowedValue.of(
- ElementAndRestriction.of(42, new SomeRestriction()),
- base,
- Arrays.asList(w1, w2, w3),
- PaneInfo.ON_TIME_AND_ONLY_FIRING));
-
- for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
- assertEquals(
- Arrays.asList(
- TimestampedValue.of("42a", base),
- TimestampedValue.of("42b", base),
- TimestampedValue.of("42c", base)),
- tester.peekOutputElementsInWindow(w));
+ for (WindowExplosion explosion : WindowExplosion.values()) {
+ ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+ new ProcessFnTester<>(
+ base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
+ tester.startElement(
+ WindowedValue.of(
+ ElementAndRestriction.of(42, new SomeRestriction()),
+ base,
+ Arrays.asList(w1, w2, w3),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ explosion);
+
+ for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
+ assertEquals(
+ Arrays.asList(
+ TimestampedValue.of("42a", base),
+ TimestampedValue.of("42b", base),
+ TimestampedValue.of("42c", base)),
+ tester.peekOutputElementsInWindow(w));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 680a971..04becd7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -17,48 +17,23 @@
*/
package org.apache.beam.runners.direct;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
-import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
+/**
+ * Provides an implementation of {@link SplittableParDo.GBKIntoKeyedWorkItems} for the Direct
+ * Runner.
+ */
class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
implements PTransformOverrideFactory<
PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
- GBKIntoKeyedWorkItems<KeyT, InputT>> {
+ SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT>> {
@Override
public PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>>
- override(GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
- return new DirectGBKIntoKeyedWorkItems<>(transform.getName());
- }
-
- /** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */
- private static class DirectGBKIntoKeyedWorkItems<KeyT, InputT>
- extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
- DirectGBKIntoKeyedWorkItems(String name) {
- super(name);
- }
-
- @Override
- public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
- checkArgument(input.getCoder() instanceof KvCoder);
- KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
- return input
- // TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO
- .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
- .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>())
- .setCoder(
- KeyedWorkItemCoder.of(
- kvCoder.getKeyCoder(),
- kvCoder.getValueCoder(),
- input.getWindowingStrategy().getWindowFn().windowCoder()));
- }
+ override(SplittableParDo.GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
+ return new DirectGroupByKey.DirectGroupByKeyOnly<>();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 219314a..efee801 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -65,7 +65,7 @@ class DirectGroupByKey<K, V>
KeyedWorkItemCoder.of(
inputCoder.getKeyCoder(),
inputCoder.getValueCoder(),
- input.getWindowingStrategy().getWindowFn().windowCoder()))
+ inputWindowingStrategy.getWindowFn().windowCoder()))
// Group each key's values by window, merging windows as needed.
.apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f71e109..82de9ab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -30,7 +30,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
@@ -88,7 +88,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
.put(ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory())
.put(ParDo.BoundMulti.class, new ParDoMultiOverrideFactory())
.put(
- GBKIntoKeyedWorkItems.class,
+ SplittableParDo.GBKIntoKeyedWorkItems.class,
new DirectGBKIntoKeyedWorkItemsOverrideFactory())
.build();
@@ -307,8 +307,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
@SuppressWarnings("rawtypes")
KeyedPValueTrackingVisitor keyedPValueVisitor =
KeyedPValueTrackingVisitor.create(
- ImmutableSet.<Class<? extends PTransform>>of(
- GBKIntoKeyedWorkItems.class,
+ ImmutableSet.of(
+ SplittableParDo.GBKIntoKeyedWorkItems.class,
DirectGroupByKeyOnly.class,
DirectGroupAlsoByWindow.class));
pipeline.traverseTopologically(keyedPValueVisitor);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 67d957c..cd644a6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -56,9 +56,9 @@ class DoFnLifecycleManager {
thrownOnTeardown = new ConcurrentHashMap<>();
}
- public DoFn<?, ?> get() throws Exception {
+ public <InputT, OutputT> DoFn<InputT, OutputT> get() throws Exception {
Thread currentThread = Thread.currentThread();
- return outstanding.get(currentThread);
+ return (DoFn<InputT, OutputT>) outstanding.get(currentThread);
}
public void remove() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 750e5f1..504ddc4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,9 +57,9 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
outputBundles.put(
- outputEntry.getKey(),
- evaluationContext.createBundle(outputEntry.getValue()));
+ outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
}
+ BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
ReadyCheckingSideInputReader sideInputReader =
evaluationContext.createSideInputReader(sideInputs);
@@ -69,7 +68,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
evaluationContext.getPipelineOptions(),
fn,
sideInputReader,
- BundleOutputManager.create(outputBundles),
+ outputManager,
mainOutputTag,
sideOutputTags,
stepContext,
@@ -85,12 +84,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
}
return new ParDoEvaluator<>(
- evaluationContext,
- runner,
- application,
- aggregatorChanges,
- outputBundles.values(),
- stepContext);
+ evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext);
}
////////////////////////////////////////////////////////////////////////////////////////////////
@@ -99,7 +93,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
private final AppliedPTransform<?, ?, ?> transform;
private final AggregatorContainer.Mutator aggregatorChanges;
- private final Collection<UncommittedBundle<?>> outputBundles;
+ private final BundleOutputManager outputManager;
private final DirectStepContext stepContext;
private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;
@@ -109,17 +103,21 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
PushbackSideInputDoFnRunner<InputT, ?> fnRunner,
AppliedPTransform<?, ?, ?> transform,
AggregatorContainer.Mutator aggregatorChanges,
- Collection<UncommittedBundle<?>> outputBundles,
+ BundleOutputManager outputManager,
DirectStepContext stepContext) {
this.evaluationContext = evaluationContext;
this.fnRunner = fnRunner;
this.transform = transform;
- this.outputBundles = outputBundles;
+ this.outputManager = outputManager;
this.stepContext = stepContext;
this.aggregatorChanges = aggregatorChanges;
this.unprocessedElements = ImmutableList.builder();
}
+ public BundleOutputManager getOutputManager() {
+ return outputManager;
+ }
+
@Override
public void processElement(WindowedValue<InputT> element) {
try {
@@ -147,7 +145,7 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> {
resultBuilder = StepTransformResult.withoutHold(transform);
}
return resultBuilder
- .addOutput(outputBundles)
+ .addOutput(outputManager.bundles.values())
.withTimerUpdate(stepContext.getTimerUpdate())
.withAggregatorChanges(aggregatorChanges)
.addUnprocessedElements(unprocessedElements.build())
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 02e034a..ec5dc2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -57,6 +57,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
public <T> TransformEvaluator<T> forApplication(
AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
+ @SuppressWarnings("unchecked")
AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>
parDoApplication =
(AppliedPTransform<
@@ -93,13 +94,12 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
*/
@SuppressWarnings({"unchecked", "rawtypes"})
TransformEvaluator<InputT> createEvaluator(
- AppliedPTransform<PCollection<?>, PCollectionTuple, ?>
- application,
- StructuralKey<?> inputBundleKey,
- DoFn<InputT, OutputT> doFn,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags)
+ AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
+ StructuralKey<?> inputBundleKey,
+ DoFn<InputT, OutputT> doFn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags)
throws Exception {
String stepName = evaluationContext.getStepName(application);
DirectStepContext stepContext =
@@ -107,21 +107,40 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
.getExecutionContext(application, inputBundleKey)
.getOrCreateStepContext(stepName, stepName);
- DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
+ DoFnLifecycleManager fnManager = getManagerForCloneOf(doFn);
+ return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
+ createParDoEvaluator(
+ application,
+ sideInputs,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ fnManager.<InputT, OutputT>get(),
+ fnManager),
+ fnManager);
+ }
+
+ ParDoEvaluator<InputT, OutputT> createParDoEvaluator(
+ AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ DirectStepContext stepContext,
+ DoFn<InputT, OutputT> fn,
+ DoFnLifecycleManager fnManager)
+ throws Exception {
try {
- return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
- ParDoEvaluator.<InputT, OutputT>create(
- evaluationContext,
- stepContext,
- application,
- application.getInput().getWindowingStrategy(),
- fnManager.get(),
- sideInputs,
- mainOutputTag,
- sideOutputTags,
- application.getOutput().getAll()),
- fnManager);
+ return ParDoEvaluator.create(
+ evaluationContext,
+ stepContext,
+ application,
+ application.getInput().getWindowingStrategy(),
+ fn,
+ sideInputs,
+ mainOutputTag,
+ sideOutputTags,
+ application.getOutput().getAll());
} catch (Exception e) {
try {
fnManager.remove();
@@ -134,4 +153,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
throw e;
}
}
+
+ public DoFnLifecycleManager getManagerForCloneOf(DoFn<?, ?> fn) {
+ return fnClones.getUnchecked(fn);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8db5159..9c9256d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -49,7 +49,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
DoFn<InputT, OutputT> fn = transform.getNewFn();
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
- return new SplittableParDo(fn);
+ return new SplittableParDo(transform);
} else if (signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
new file mode 100644
index 0000000..0eca710
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT>
+ implements TransformEvaluatorFactory {
+ private final ParDoEvaluatorFactory<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ delegateFactory;
+ private final EvaluationContext evaluationContext;
+
+ SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
+ this.evaluationContext = evaluationContext;
+ this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
+ }
+
+ @Override
+ public <T> TransformEvaluator<T> forApplication(
+ AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TransformEvaluator<T> evaluator =
+ (TransformEvaluator<T>)
+ createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle);
+ return evaluator;
+ }
+
+ @Override
+ public void cleanup() throws Exception {
+ delegateFactory.cleanup();
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
+ createEvaluator(
+ AppliedPTransform<
+ PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>,
+ PCollectionTuple, SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT>>
+ application,
+ CommittedBundle<InputT> inputBundle)
+ throws Exception {
+ final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT> transform =
+ application.getTransform();
+
+ DoFnLifecycleManager fnManager = delegateFactory.getManagerForCloneOf(transform.getFn());
+
+ SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, ?> processFn =
+ transform.newProcessFn(fnManager.<InputT, OutputT>get());
+
+ String stepName = evaluationContext.getStepName(application);
+ final DirectExecutionContext.DirectStepContext stepContext =
+ evaluationContext
+ .getExecutionContext(application, inputBundle.getKey())
+ .getOrCreateStepContext(stepName, stepName);
+
+ ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ parDoEvaluator =
+ delegateFactory.createParDoEvaluator(
+ application,
+ transform.getSideInputs(),
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ stepContext,
+ processFn,
+ fnManager);
+
+ processFn.setStateInternalsFactory(
+ new StateInternalsFactory<String>() {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public StateInternals<String> stateInternalsForKey(String key) {
+ return (StateInternals) stepContext.stateInternals();
+ }
+ });
+
+ processFn.setTimerInternalsFactory(
+ new TimerInternalsFactory<String>() {
+ @Override
+ public TimerInternals timerInternalsForKey(String key) {
+ return stepContext.timerInternals();
+ }
+ });
+
+ final OutputManager outputManager = parDoEvaluator.getOutputManager();
+ processFn.setOutputWindowedValue(
+ new OutputWindowedValue<OutputT>() {
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(
+ transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+ }
+ });
+
+ return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index a4c462a..1ddf9f4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -61,6 +62,10 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
.put(
TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
new TestStreamEvaluatorFactory(ctxt))
+ // Runner-specific primitive used in expansion of SplittableParDo
+ .put(
+ SplittableParDo.ProcessElements.class,
+ new SplittableProcessElementsEvaluatorFactory<>(ctxt))
.build();
return new TransformEvaluatorRegistry(primitives);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
index c164ce6..f9e833f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.Serializable;
import java.util.ArrayList;
@@ -32,20 +33,28 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.MutableDateTime;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -66,6 +75,11 @@ public class SplittableDoFnTest {
this.from = from;
this.to = to;
}
+
+ @Override
+ public String toString() {
+ return "OffsetRange{" + "from=" + from + ", to=" + to + '}';
+ }
}
private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
@@ -140,11 +154,8 @@ public class SplittableDoFnTest {
}
}
- @Ignore(
- "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
- + "It must be implemented as a primitive.")
@Test
- public void testPairWithIndexBasic() throws ClassNotFoundException {
+ public void testPairWithIndexBasic() {
Pipeline p = TestPipeline.create();
p.getOptions().setRunner(DirectRunner.class);
PCollection<KV<String, Integer>> res =
@@ -167,11 +178,8 @@ public class SplittableDoFnTest {
p.run();
}
- @Ignore(
- "BEAM-801: SplittableParDo uses unsupported OldDoFn features that are not available in DoFn; "
- + "It must be implemented as a primitive.")
@Test
- public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
+ public void testPairWithIndexWindowedTimestamped() {
// Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
// of elements in the input collection.
Pipeline p = TestPipeline.create();
@@ -228,4 +236,172 @@ public class SplittableDoFnTest {
}
p.run();
}
+
+ private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
+ private final PCollectionView<String> sideInput;
+ private final TupleTag<String> sideOutput;
+
+ private SDFWithSideInputsAndOutputs(
+ PCollectionView<String> sideInput, TupleTag<String> sideOutput) {
+ this.sideInput = sideInput;
+ this.sideOutput = sideOutput;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ checkState(tracker.tryClaim(tracker.currentRestriction().from));
+ String side = c.sideInput(sideInput);
+ c.output("main:" + side + ":" + c.element());
+ c.sideOutput(sideOutput, "side:" + side + ":" + c.element());
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(Integer value) {
+ return new OffsetRange(0, 1);
+ }
+
+ @NewTracker
+ public OffsetRangeTracker newTracker(OffsetRange range) {
+ return new OffsetRangeTracker(range);
+ }
+ }
+
+ @Test
+ public void testSideInputsAndOutputs() throws Exception {
+ Pipeline p = TestPipeline.create();
+ p.getOptions().setRunner(DirectRunner.class);
+
+ PCollectionView<String> sideInput =
+ p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton());
+ TupleTag<String> mainOutputTag = new TupleTag<>("main");
+ TupleTag<String> sideOutputTag = new TupleTag<>("side");
+
+ PCollectionTuple res =
+ p.apply("input", Create.of(0, 1, 2))
+ .apply(
+ ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag))
+ .withSideInputs(sideInput)
+ .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ res.get(mainOutputTag).setCoder(StringUtf8Coder.of());
+ res.get(sideOutputTag).setCoder(StringUtf8Coder.of());
+
+ PAssert.that(res.get(mainOutputTag))
+ .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2"));
+ PAssert.that(res.get(sideOutputTag))
+ .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2"));
+
+ p.run();
+ }
+
+ @Test
+ public void testLateData() throws Exception {
+ Pipeline p = TestPipeline.create();
+ p.getOptions().setRunner(DirectRunner.class);
+
+ Instant base = Instant.now();
+
+ TestStream<String> stream =
+ TestStream.create(StringUtf8Coder.of())
+ .advanceWatermarkTo(base)
+ .addElements("aa")
+ .advanceWatermarkTo(base.plus(Duration.standardSeconds(5)))
+ .addElements(TimestampedValue.of("bb", base.minus(Duration.standardHours(1))))
+ .advanceProcessingTime(Duration.standardHours(1))
+ .advanceWatermarkToInfinity();
+
+ PCollection<String> input =
+ p.apply(stream)
+ .apply(
+ Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
+ .withAllowedLateness(Duration.standardMinutes(1)));
+
+ PCollection<KV<String, Integer>> afterSDF =
+ input
+ .apply(ParDo.of(new PairStringWithIndexToLength()))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+ PCollection<String> nonLate =
+ afterSDF.apply(GroupByKey.<String, Integer>create()).apply(Keys.<String>create());
+
+ // The splittable DoFn itself should not drop any data and act as pass-through.
+ PAssert.that(afterSDF)
+ .containsInAnyOrder(
+ Arrays.asList(KV.of("aa", 0), KV.of("aa", 1), KV.of("bb", 0), KV.of("bb", 1)));
+
+ // But it should preserve the windowing strategy of the data, including allowed lateness:
+ // the follow-up GBK should drop the late data.
+ assertEquals(afterSDF.getWindowingStrategy(), input.getWindowingStrategy());
+ PAssert.that(nonLate).containsInAnyOrder("aa");
+
+ p.run();
+ }
+
+ private static class SDFWithLifecycle extends DoFn<String, String> {
+ private enum State {
+ BEFORE_SETUP,
+ OUTSIDE_BUNDLE,
+ INSIDE_BUNDLE,
+ TORN_DOWN
+ }
+
+ private State state = State.BEFORE_SETUP;
+
+ @ProcessElement
+ public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
+ assertEquals(State.INSIDE_BUNDLE, state);
+ assertTrue(tracker.tryClaim(0));
+ c.output(c.element());
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(String value) {
+ return new OffsetRange(0, 1);
+ }
+
+ @NewTracker
+ public OffsetRangeTracker newTracker(OffsetRange range) {
+ return new OffsetRangeTracker(range);
+ }
+
+ @Setup
+ public void setUp() {
+ assertEquals(State.BEFORE_SETUP, state);
+ state = State.OUTSIDE_BUNDLE;
+ }
+
+ @StartBundle
+ public void startBundle(Context c) {
+ assertEquals(State.OUTSIDE_BUNDLE, state);
+ state = State.INSIDE_BUNDLE;
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) {
+ assertEquals(State.INSIDE_BUNDLE, state);
+ state = State.OUTSIDE_BUNDLE;
+ }
+
+ @Teardown
+ public void tearDown() {
+ assertEquals(State.OUTSIDE_BUNDLE, state);
+ state = State.TORN_DOWN;
+ }
+ }
+
+ @Test
+ public void testLifecycleMethods() throws Exception {
+ Pipeline p = TestPipeline.create();
+ p.getOptions().setRunner(DirectRunner.class);
+
+ PCollection<String> res =
+ p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle()));
+
+ PAssert.that(res).containsInAnyOrder("a", "b", "c");
+
+ p.run();
+ }
+
+ // TODO (https://issues.apache.org/jira/browse/BEAM-988): Test that Splittable DoFn
+ // emits output immediately (i.e. has a pass-through trigger) regardless of input's
+ // windowing/triggering strategy.
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 3f1a3f9..7aabec9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -120,6 +120,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* should be in, throwing an exception if the {@code WindowFn} attempts
* to access any information about the input element. The output element
* will have a timestamp of negative infinity.
+ *
+ * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+ * {@link StartBundle} or {@link FinishBundle} methods.
*/
public abstract void output(OutputT output);
@@ -142,6 +145,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* should be in, throwing an exception if the {@code WindowFn} attempts
* to access any information about the input element except for the
* timestamp.
+ *
+ * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+ * {@link StartBundle} or {@link FinishBundle} methods.
*/
public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
@@ -168,6 +174,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* to access any information about the input element. The output element
* will have a timestamp of negative infinity.
*
+ * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+ * {@link StartBundle} or {@link FinishBundle} methods.
+ *
* @see ParDo#withOutputTags
*/
public abstract <T> void sideOutput(TupleTag<T> tag, T output);
@@ -192,6 +201,9 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* to access any information about the input element except for the
* timestamp.
*
+ * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from
+ * {@link StartBundle} or {@link FinishBundle} methods.
+ *
* @see ParDo#withOutputTags
*/
public abstract <T> void sideOutputWithTimestamp(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index daa8a06..0c6043f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -140,6 +140,15 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
windowValues.put(window, value);
}
+ @SuppressWarnings("unchecked")
+ public <K> StateInternals<K> getStateInternals() {
+ return (StateInternals<K>) stateInternals;
+ }
+
+ public TimerInternals getTimerInternals() {
+ return timerInternals;
+ }
+
/**
* When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage
* the lifecycle of the {@link DoFn}.
@@ -321,7 +330,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
*
*/
public List<OutputT> peekOutputElements() {
- // TODO: Should we return an unmodifiable list?
return Lists.transform(
peekOutputElementsWithTimestamp(),
new Function<TimestampedValue<OutputT>, OutputT>() {
@@ -344,7 +352,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Experimental
public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
// TODO: Should we return an unmodifiable list?
- return Lists.transform(getOutput(mainOutputTag),
+ return Lists.transform(getImmutableOutput(mainOutputTag),
new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() {
@Override
@SuppressWarnings("unchecked")
@@ -370,7 +378,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
TupleTag<OutputT> tag,
BoundedWindow window) {
ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = ImmutableList.builder();
- for (WindowedValue<OutputT> value : getOutput(tag)) {
+ for (WindowedValue<OutputT> value : getImmutableOutput(tag)) {
if (value.getWindows().contains(window)) {
valuesBuilder.add(TimestampedValue.of(value.getValue(), value.getTimestamp()));
}
@@ -384,7 +392,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
* @see #peekOutputElements
*/
public void clearOutputElements() {
- peekOutputElements().clear();
+ getMutableOutput(mainOutputTag).clear();
}
/**
@@ -425,7 +433,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
*/
public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
// TODO: Should we return an unmodifiable list?
- return Lists.transform(getOutput(tag),
+ return Lists.transform(getImmutableOutput(tag),
new Function<WindowedValue<T>, T>() {
@SuppressWarnings("unchecked")
@Override
@@ -441,7 +449,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
* @see #peekSideOutputElements
*/
public <T> void clearSideOutputElements(TupleTag<T> tag) {
- peekSideOutputElements(tag).clear();
+ getMutableOutput(tag).clear();
}
/**
@@ -502,10 +510,25 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return combiner.extractOutput(accumulator);
}
- private <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
+ private <T> List<WindowedValue<T>> getImmutableOutput(TupleTag<T> tag) {
@SuppressWarnings({"unchecked", "rawtypes"})
List<WindowedValue<T>> elems = (List) outputs.get(tag);
- return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList());
+ return ImmutableList.copyOf(
+ MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList()));
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public <T> List<WindowedValue<T>> getMutableOutput(TupleTag<T> tag) {
+ List<WindowedValue<T>> outputList = (List) outputs.get(tag);
+ if (outputList == null) {
+ outputList = new ArrayList<>();
+ outputs.put(tag, (List) outputList);
+ }
+ return outputList;
+ }
+
+ public TupleTag<OutputT> getMainOutputTag() {
+ return mainOutputTag;
}
private TestContext createContext(OldDoFn<InputT, OutputT> fn) {
@@ -590,17 +613,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
public <T> void noteOutput(TupleTag<T> tag, WindowedValue<T> output) {
- getOutputList(tag).add(output);
- }
-
- private <T> List<WindowedValue<T>> getOutputList(TupleTag<T> tag) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<WindowedValue<T>> outputList = (List) outputs.get(tag);
- if (outputList == null) {
- outputList = new ArrayList<>();
- outputs.put(tag, (List) outputList);
- }
- return outputList;
+ getMutableOutput(tag).add(output);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java
new file mode 100644
index 0000000..b9c3d5e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerInternalsFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.util.TimerInternals;
+
+/**
+ * A factory for providing {@link TimerInternals} for a particular key.
+ *
+ * <p>Because it will generally be embedded in a {@link org.apache.beam.sdk.transforms.DoFn DoFn},
+ * albeit at execution time, it is marked {@link Serializable}.
+ */
+@Experimental(Kind.STATE)
+public interface TimerInternalsFactory<K> {
+
+ /** Returns {@link TimerInternals} for the provided key. */
+ TimerInternals timerInternalsForKey(K key);
+}