You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/08 22:25:20 UTC
[5/9] beam git commit: Explicitly GBK before stateful ParDo in
Dataflow batch
Explicitly GBK before stateful ParDo in Dataflow batch
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e2afa29
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e2afa29
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e2afa29
Branch: refs/heads/release-0.6.0
Commit: 5e2afa29a3a0fe93e662b2fe7173c1641c253cd5
Parents: 92c5b5b
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 2 14:29:56 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Mar 8 13:41:01 2017 -0800
----------------------------------------------------------------------
.../dataflow/BatchStatefulParDoOverrides.java | 283 +++++++++++++++++++
.../dataflow/DataflowPipelineTranslator.java | 5 +-
.../beam/runners/dataflow/DataflowRunner.java | 6 +
.../beam/runners/dataflow/util/DoFnInfo.java | 9 +
.../BatchStatefulParDoOverridesTest.java | 169 +++++++++++
.../DataflowPipelineTranslatorTest.java | 72 +++++
.../apache/beam/sdk/transforms/ParDoTest.java | 117 ++++++++
7 files changed, 660 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2afa29/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
new file mode 100644
index 0000000..91f84ab
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Iterables;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Instant;
+
+/**
+ * {@link PTransformOverrideFactory PTransformOverrideFactories} that expands to correctly implement
+ * stateful {@link ParDo} using window-unaware {@link GroupByKeyAndSortValuesOnly} to linearize
+ * processing per key.
+ *
+ * <p>This implementation relies on implementation details of the Dataflow runner, specifically
+ * standard fusion behavior of {@link ParDo} tranforms following a {@link GroupByKey}.
+ */
+public class BatchStatefulParDoOverrides {
+
+ /**
+ * Returns a {@link PTransformOverrideFactory} that replaces a single-output
+ * {@link ParDo} with a composite transform specialized for the {@link DataflowRunner}.
+ */
+ public static <K, InputT, OutputT>
+ PTransformOverrideFactory<
+ PCollection<KV<K, InputT>>, PCollection<OutputT>, ParDo.Bound<KV<K, InputT>, OutputT>>
+ singleOutputOverrideFactory() {
+ return new SingleOutputOverrideFactory<>();
+ }
+
+ /**
+ * Returns a {@link PTransformOverrideFactory} that replaces a multi-output
+ * {@link ParDo} with a composite transform specialized for the {@link DataflowRunner}.
+ */
+ public static <K, InputT, OutputT>
+ PTransformOverrideFactory<
+ PCollection<KV<K, InputT>>, PCollectionTuple,
+ ParDo.BoundMulti<KV<K, InputT>, OutputT>>
+ multiOutputOverrideFactory() {
+ return new MultiOutputOverrideFactory<>();
+ }
+
+ private static class SingleOutputOverrideFactory<K, InputT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<KV<K, InputT>>, PCollection<OutputT>, ParDo.Bound<KV<K, InputT>, OutputT>> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> getReplacementTransform(
+ ParDo.Bound<KV<K, InputT>, OutputT> originalParDo) {
+ return new StatefulSingleOutputParDo<>(originalParDo);
+ }
+
+ @Override
+ public PCollection<KV<K, InputT>> getInput(List<TaggedPValue> inputs, Pipeline p) {
+ return (PCollection<KV<K, InputT>>) Iterables.getOnlyElement(inputs).getValue();
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ List<TaggedPValue> outputs, PCollection<OutputT> newOutput) {
+ return ReplacementOutputs.singleton(outputs, newOutput);
+ }
+ }
+
+ private static class MultiOutputOverrideFactory<K, InputT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<KV<K, InputT>>, PCollectionTuple, ParDo.BoundMulti<KV<K, InputT>, OutputT>> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> getReplacementTransform(
+ BoundMulti<KV<K, InputT>, OutputT> originalParDo) {
+ return new StatefulMultiOutputParDo<>(originalParDo);
+ }
+
+ @Override
+ public PCollection<KV<K, InputT>> getInput(List<TaggedPValue> inputs, Pipeline p) {
+ return (PCollection<KV<K, InputT>>) Iterables.getOnlyElement(inputs).getValue();
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+ return ReplacementOutputs.tagged(outputs, newOutput);
+ }
+ }
+
+ static class StatefulSingleOutputParDo<K, InputT, OutputT>
+ extends PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> {
+
+ private final ParDo.Bound<KV<K, InputT>, OutputT> originalParDo;
+
+ StatefulSingleOutputParDo(ParDo.Bound<KV<K, InputT>, OutputT> originalParDo) {
+ this.originalParDo = originalParDo;
+ }
+
+ ParDo.Bound<KV<K, InputT>, OutputT> getOriginalParDo() {
+ return originalParDo;
+ }
+
+ @Override
+ public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) {
+ DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+ verifyFnIsStateful(fn);
+
+ PTransform<
+ PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,
+ PCollection<OutputT>>
+ statefulParDo =
+ ParDo.of(new BatchStatefulDoFn<>(fn)).withSideInputs(originalParDo.getSideInputs());
+
+ return input.apply(new GbkBeforeStatefulParDo<K, InputT>()).apply(statefulParDo);
+ }
+ }
+
+ static class StatefulMultiOutputParDo<K, InputT, OutputT>
+ extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
+
+ private final BoundMulti<KV<K, InputT>, OutputT> originalParDo;
+
+ StatefulMultiOutputParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> originalParDo) {
+ this.originalParDo = originalParDo;
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
+ DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+ verifyFnIsStateful(fn);
+
+ PTransform<
+ PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,
+ PCollectionTuple>
+ statefulParDo =
+ ParDo.of(new BatchStatefulDoFn<K, InputT, OutputT>(fn))
+ .withSideInputs(originalParDo.getSideInputs())
+ .withOutputTags(
+ originalParDo.getMainOutputTag(), originalParDo.getSideOutputTags());
+
+ return input.apply(new GbkBeforeStatefulParDo<K, InputT>()).apply(statefulParDo);
+ }
+
+ public BoundMulti<KV<K, InputT>, OutputT> getOriginalParDo() {
+ return originalParDo;
+ }
+ }
+
+ static class GbkBeforeStatefulParDo<K, V>
+ extends PTransform<
+ PCollection<KV<K, V>>,
+ PCollection<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>>> {
+
+ @Override
+ public PCollection<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>> expand(
+ PCollection<KV<K, V>> input) {
+
+ WindowingStrategy<?, ?> inputWindowingStrategy = input.getWindowingStrategy();
+
+ // A KvCoder is required since this goes through GBK. Further, WindowedValueCoder
+ // is not registered by default, so we explicitly set the relevant coders.
+ checkState(
+ input.getCoder() instanceof KvCoder,
+ "Input to a %s using state requires a %s, but the coder was %s",
+ ParDo.class.getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ input.getCoder());
+ KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
+ Coder<K> keyCoder = kvCoder.getKeyCoder();
+ Coder<? extends BoundedWindow> windowCoder =
+ inputWindowingStrategy.getWindowFn().windowCoder();
+
+ return input
+ // Stash the original timestamps, etc, for when it is fed to the user's DoFn
+ .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<K, V>()))
+ .setCoder(
+ KvCoder.of(
+ keyCoder,
+ KvCoder.of(InstantCoder.of(), WindowedValue.getFullCoder(kvCoder, windowCoder))))
+
+ // Group by key and sort by timestamp, dropping windows as they are reified
+ .apply(
+ "PartitionKeys",
+ new GroupByKeyAndSortValuesOnly<K, Instant, WindowedValue<KV<K, V>>>())
+
+ // The GBKO sets the windowing strategy to the global default
+ .setWindowingStrategyInternal(inputWindowingStrategy);
+ }
+ }
+
+ /** A key-preserving {@link DoFn} that reifies a windowed value. */
+ static class ReifyWindowedValueFn<K, V>
+ extends DoFn<KV<K, V>, KV<K, KV<Instant, WindowedValue<KV<K, V>>>>> {
+ @ProcessElement
+ public void processElement(final ProcessContext c, final BoundedWindow window) {
+ c.output(
+ KV.of(
+ c.element().getKey(),
+ KV.of(
+ c.timestamp(), WindowedValue.of(c.element(), c.timestamp(), window, c.pane()))));
+ }
+ }
+
+ /**
+ * A key-preserving {@link DoFn} that explodes an iterable that has been grouped by key and
+ * window.
+ */
+ public static class BatchStatefulDoFn<K, V, OutputT>
+ extends DoFn<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>, OutputT> {
+
+ private final DoFn<KV<K, V>, OutputT> underlyingDoFn;
+
+ BatchStatefulDoFn(DoFn<KV<K, V>, OutputT> underlyingDoFn) {
+ this.underlyingDoFn = underlyingDoFn;
+ }
+
+ public DoFn<KV<K, V>, OutputT> getUnderlyingDoFn() {
+ return underlyingDoFn;
+ }
+
+ @ProcessElement
+ public void processElement(final ProcessContext c, final BoundedWindow window) {
+ throw new UnsupportedOperationException(
+ "BatchStatefulDoFn.ProcessElement should never be invoked");
+ }
+
+ @Override
+ public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return underlyingDoFn.getOutputTypeDescriptor();
+ }
+ }
+
+ private static <InputT, OutputT> void verifyFnIsStateful(DoFn<InputT, OutputT> fn) {
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
+ // It is still correct to use this without state or timers, but a bad idea.
+ // Since it is internal it should never be used wrong, so it is OK to crash.
+ checkState(
+ signature.usesState() || signature.usesTimers(),
+ "%s used for %s that does not use state or timers.",
+ BatchStatefulParDoOverrides.class.getSimpleName(),
+ ParDo.class.getSimpleName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2afa29/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 7e559e9..fc47593 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -956,7 +956,10 @@ public class DataflowPipelineTranslator {
DoFnInfo.forFn(
fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
- if (signature.usesState() || signature.usesTimers()) {
+ // Setting USES_KEYED_STATE will cause an ungrouped shuffle, which works
+ // in streaming but does not work in batch
+ if (context.getPipelineOptions().isStreaming()
+ && (signature.usesState() || signature.usesTimers())) {
stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2afa29/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 15147f1..b782786 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -341,6 +341,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
UnsupportedOverrideFactory.withMessage(
"The DataflowRunner in batch mode does not support Read.Unbounded"));
ptoverrides
+ // State and timer pardos are implemented by expansion to GBK-then-ParDo
+ .put(PTransformMatchers.stateOrTimerParDoMulti(),
+ BatchStatefulParDoOverrides.multiOutputOverrideFactory())
+ .put(PTransformMatchers.stateOrTimerParDoSingle(),
+ BatchStatefulParDoOverrides.singleOutputOverrideFactory())
+
// Write uses views internally
.put(PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this))
.put(
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2afa29/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 4d80a39..55c62ae 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -72,6 +72,15 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
outputMap);
}
+ public DoFnInfo<InputT, OutputT> withFn(DoFn<InputT, OutputT> newFn) {
+ return DoFnInfo.forFn(newFn,
+ windowingStrategy,
+ sideInputViews,
+ inputCoder,
+ mainOutput,
+ outputMap);
+ }
+
private DoFnInfo(
DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2afa29/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
new file mode 100644
index 0000000..ef3e414
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.BatchStatefulParDoOverrides.StatefulMultiOutputParDo;
+import org.apache.beam.runners.dataflow.BatchStatefulParDoOverrides.StatefulSingleOutputParDo;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** Tests for {@link BatchStatefulParDoOverrides}. */
+@RunWith(JUnit4.class)
+public class BatchStatefulParDoOverridesTest implements Serializable {
+
+ @Test
+ public void testSingleOutputOverrideNonCrashing() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline pipeline = Pipeline.create(options);
+
+ DummyStatefulDoFn fn = new DummyStatefulDoFn();
+ pipeline.apply(Create.of(KV.of(1, 2))).apply(ParDo.of(fn));
+
+ DataflowRunner runner = (DataflowRunner) pipeline.getRunner();
+ runner.replaceTransforms(pipeline);
+ assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
+ }
+
+ @Test
+ public void testMultiOutputOverrideNonCrashing() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ Pipeline pipeline = Pipeline.create(options);
+
+ TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
+
+ DummyStatefulDoFn fn = new DummyStatefulDoFn();
+ pipeline
+ .apply(Create.of(KV.of(1, 2)))
+ .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(fn));
+
+ DataflowRunner runner = (DataflowRunner) pipeline.getRunner();
+ runner.replaceTransforms(pipeline);
+ assertThat(findBatchStatefulDoFn(pipeline), equalTo((DoFn) fn));
+ }
+
+ private static DummyStatefulDoFn findBatchStatefulDoFn(Pipeline p) {
+ FindBatchStatefulDoFnVisitor findBatchStatefulDoFnVisitor = new FindBatchStatefulDoFnVisitor();
+ p.traverseTopologically(findBatchStatefulDoFnVisitor);
+ return (DummyStatefulDoFn) findBatchStatefulDoFnVisitor.getStatefulDoFn();
+ }
+
+ private static class DummyStatefulDoFn extends DoFn<KV<Integer, Integer>, Integer> {
+
+ @StateId("foo")
+ private final StateSpec<Object, ValueState<Integer>> spec = StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void processElem(ProcessContext c) {
+ // noop
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof DummyStatefulDoFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+
+ private static class FindBatchStatefulDoFnVisitor extends PipelineVisitor.Defaults {
+
+ @Nullable private DoFn<?, ?> batchStatefulDoFn;
+
+ public DoFn<?, ?> getStatefulDoFn() {
+ assertThat(batchStatefulDoFn, not(nullValue()));
+ return batchStatefulDoFn;
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (node.getTransform() instanceof StatefulSingleOutputParDo) {
+ batchStatefulDoFn =
+ ((StatefulSingleOutputParDo) node.getTransform()).getOriginalParDo().getFn();
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ } else if (node.getTransform() instanceof StatefulMultiOutputParDo) {
+ batchStatefulDoFn =
+ ((StatefulMultiOutputParDo) node.getTransform()).getOriginalParDo().getFn();
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ } else {
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+ }
+ }
+
+ private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
+ GcsUtil mockGcsUtil = mock(GcsUtil.class);
+ when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
+ @Override
+ public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+ return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+ }
+ });
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowRunner.class);
+ options.setGcpCredential(new TestCredential());
+ options.setJobName("some-job-name");
+ options.setProject("some-project");
+ options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
+ options.setFilesToStage(new LinkedList<String>());
+ options.setGcsUtil(mockGcsUtil);
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2afa29/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d4271e5..660e92e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -22,8 +22,10 @@ import static org.apache.beam.sdk.util.Structs.getDictionary;
import static org.apache.beam.sdk.util.Structs.getString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -81,10 +83,15 @@ import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -825,6 +832,71 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
+ /**
+ * Smoke test to fail fast if translation of a stateful ParDo
+ * in batch breaks.
+ */
+ @Test
+ public void testBatchStatefulParDoTranslation() throws Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ options.setStreaming(false);
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
+
+ pipeline
+ .apply(Create.of(KV.of(1, 1)))
+ .apply(
+ ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(
+ new DoFn<KV<Integer, Integer>, Integer>() {
+ @StateId("unused")
+ final StateSpec<Object, ValueState<Integer>> stateSpec =
+ StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ // noop
+ }
+ }));
+
+ runner.replaceTransforms(pipeline);
+
+ Job job =
+ translator
+ .translate(
+ pipeline,
+ runner,
+ Collections.<DataflowPackage>emptyList())
+ .getJob();
+
+ // The job should look like:
+ // 0. ParallelRead (Create)
+ // 1. ParDo(ReifyWVs)
+ // 2. GroupByKeyAndSortValuesONly
+ // 3. A ParDo over grouped and sorted KVs that is executed via ungrouping service-side
+
+ List<Step> steps = job.getSteps();
+ assertEquals(4, steps.size());
+
+ Step createStep = steps.get(0);
+ assertEquals("ParallelRead", createStep.getKind());
+
+ Step reifyWindowedValueStep = steps.get(1);
+ assertEquals("ParallelDo", reifyWindowedValueStep.getKind());
+
+ Step gbkStep = steps.get(2);
+ assertEquals("GroupByKey", gbkStep.getKind());
+
+ Step statefulParDoStep = steps.get(3);
+ assertEquals("ParallelDo", statefulParDoStep.getKind());
+ assertThat(
+ (String) statefulParDoStep.getProperties().get(PropertyNames.USES_KEYED_STATE),
+ not(equalTo("true")));
+ }
+
@Test
public void testToSingletonTranslationWithIsmSideInput() throws Exception {
// A "change detector" test that makes sure the translation
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2afa29/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 6db0af4..e58f78e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -48,6 +48,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1506,6 +1507,54 @@ public class ParDoTest implements Serializable {
@Test
@Category({RunnableOnService.class, UsesStatefulParDo.class})
+ public void testValueStateDedup() {
+ final String stateId = "foo";
+
+ DoFn<KV<Integer, Integer>, Integer> onePerKey =
+ new DoFn<KV<Integer, Integer>, Integer>() {
+
+ @StateId(stateId)
+ private final StateSpec<Object, ValueState<Integer>> seenSpec =
+ StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @StateId(stateId) ValueState<Integer> seenState) {
+ Integer seen = MoreObjects.firstNonNull(seenState.read(), 0);
+
+ if (seen == 0) {
+ seenState.write(seen + 1);
+ c.output(c.element().getValue());
+ }
+ }
+ };
+
+ int numKeys = 50;
+ // A big enough list that we can see some deduping
+ List<KV<Integer, Integer>> input = new ArrayList<>();
+
+ // The output should have no dupes
+ Set<Integer> expectedOutput = new HashSet<>();
+
+ for (int key = 0; key < numKeys; ++key) {
+ int output = 1000 + key;
+ expectedOutput.add(output);
+
+ for (int i = 0; i < 15; ++i) {
+ input.add(KV.of(key, output));
+ }
+ }
+
+ Collections.shuffle(input);
+
+ PCollection<Integer> output = pipeline.apply(Create.of(input)).apply(ParDo.of(onePerKey));
+
+ PAssert.that(output).containsInAnyOrder(expectedOutput);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({RunnableOnService.class, UsesStatefulParDo.class})
public void testValueStateFixedWindows() {
final String stateId = "foo";
@@ -1936,6 +1985,74 @@ public class ParDoTest implements Serializable {
pipeline.run();
}
+ /**
+ * Tests that event time timers for multiple keys both fire. This particularly exercises
+ * implementations that may GC in ways not simply governed by the watermark.
+ */
+ @Test
+ @Category({RunnableOnService.class, UsesTimersInParDo.class})
+ public void testEventTimeTimerMultipleKeys() throws Exception {
+ final String timerId = "foo";
+ final String stateId = "sizzle";
+
+ final int offset = 5000;
+ final int timerOutput = 4093;
+
+ DoFn<KV<String, Integer>, KV<String, Integer>> fn =
+ new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId(stateId)
+ private final StateSpec<Object, ValueState<String>> stateSpec =
+ StateSpecs.value(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext context,
+ @TimerId(timerId) Timer timer,
+ @StateId(stateId) ValueState<String> state,
+ BoundedWindow window) {
+ timer.set(window.maxTimestamp());
+ state.write(context.element().getKey());
+ context.output(
+ KV.of(context.element().getKey(), context.element().getValue() + offset));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String> state) {
+ context.output(KV.of(state.read(), timerOutput));
+ }
+ };
+
+ // Enough keys that we exercise interesting code paths
+ int numKeys = 50;
+ List<KV<String, Integer>> input = new ArrayList<>();
+ List<KV<String, Integer>> expectedOutput = new ArrayList<>();
+
+ for (Integer key = 0; key < numKeys; ++key) {
+ // Each key should have just one final output at GC time
+ expectedOutput.add(KV.of(key.toString(), timerOutput));
+
+ for (int i = 0; i < 15; ++i) {
+ // Each input should be output with the offset added
+ input.add(KV.of(key.toString(), i));
+ expectedOutput.add(KV.of(key.toString(), i + offset));
+ }
+ }
+
+ Collections.shuffle(input);
+
+ PCollection<KV<String, Integer>> output =
+ pipeline
+ .apply(
+ Create.of(input))
+ .apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(expectedOutput);
+ pipeline.run();
+ }
+
@Test
@Category({RunnableOnService.class, UsesTimersInParDo.class})
public void testAbsoluteProcessingTimeTimerRejected() throws Exception {