You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/03 22:59:26 UTC
[1/4] beam git commit: This closes #2152
Repository: beam
Updated Branches:
refs/heads/master 178381992 -> 7e9233bbd
This closes #2152
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e9233bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e9233bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e9233bb
Branch: refs/heads/master
Commit: 7e9233bbd19546831cd76eddc33b51e0d4360f00
Parents: 1783819 3de44a3
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 14:59:12 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 92 +++---
.../dataflow/StreamingViewOverrides.java | 278 ++-----------------
.../apache/beam/sdk/transforms/CombineTest.java | 46 +++
3 files changed, 103 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
[4/4] beam git commit: Only Override CreatePCollectionView in
Streaming
Posted by tg...@apache.org.
Only Override CreatePCollectionView in Streaming
This permits us to use the appropriate view token for the
StreamingPCollectionViewWriterFn.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b94c99b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b94c99b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b94c99b
Branch: refs/heads/master
Commit: 7b94c99be43d82bcab9370f63c0d63646146ca97
Parents: 1783819
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 10:56:29 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 92 +++---
.../dataflow/StreamingViewOverrides.java | 287 +++----------------
2 files changed, 76 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 50b6b4f..c609b54 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
+import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -75,6 +76,8 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
@@ -94,18 +97,12 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.View.AsIterable;
-import org.apache.beam.sdk.transforms.View.AsList;
-import org.apache.beam.sdk.transforms.View.AsMap;
-import org.apache.beam.sdk.transforms.View.AsMultimap;
-import org.apache.beam.sdk.transforms.View.AsSingleton;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -326,29 +323,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.classEqualTo(Read.Unbounded.class),
new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))
.put(
- PTransformMatchers.classEqualTo(GloballyAsSingletonView.class),
- new ReflectiveOneToOneOverrideFactory(
- StreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, this))
- .put(
- PTransformMatchers.classEqualTo(AsMap.class),
- new ReflectiveOneToOneOverrideFactory(
- StreamingViewOverrides.StreamingViewAsMap.class, this))
- .put(
- PTransformMatchers.classEqualTo(AsMultimap.class),
- new ReflectiveOneToOneOverrideFactory(
- StreamingViewOverrides.StreamingViewAsMultimap.class, this))
- .put(
- PTransformMatchers.classEqualTo(AsSingleton.class),
- new ReflectiveOneToOneOverrideFactory(
- StreamingViewOverrides.StreamingViewAsSingleton.class, this))
- .put(
- PTransformMatchers.classEqualTo(AsList.class),
- new ReflectiveOneToOneOverrideFactory(
- StreamingViewOverrides.StreamingViewAsList.class, this))
- .put(
- PTransformMatchers.classEqualTo(AsIterable.class),
- new ReflectiveOneToOneOverrideFactory(
- StreamingViewOverrides.StreamingViewAsIterable.class, this));
+ PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+ new StreamingCreatePCollectionViewFactory());
} else {
// In batch mode must use the custom Pubsub bounded source/sink.
for (Class<? extends PTransform> unsupported :
@@ -719,30 +695,40 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// have just recorded the full names during apply time.
if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
- pipeline.traverseTopologically(new PipelineVisitor() {
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {
- }
-
- @Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
- ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
- }
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
- ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
- }
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- }
- });
+ pipeline.traverseTopologically(
+ new PipelineVisitor() {
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {}
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+ ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+ }
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ if (node.getTransform() instanceof View.AsMap
+ || node.getTransform() instanceof View.AsMultimap) {
+ PCollection<KV<?, ?>> input =
+ (PCollection<KV<?, ?>>) Iterables.getOnlyElement(node.getInputs()).getValue();
+ KvCoder<?, ?> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+ }
+ }
+ if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+ ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {}
+ });
LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
+ "because the key coder is not deterministic. Falling back to singleton implementation "
http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index bab115f..8e005cf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -19,23 +19,18 @@
package org.apache.beam.runners.dataflow;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.Map;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.dataflow.DataflowRunner.StreamingPCollectionViewWriterFn;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -44,261 +39,58 @@ import org.apache.beam.sdk.values.PCollectionView;
* types.
*/
class StreamingViewOverrides {
- static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
- Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingCombineGloballyAsSingletonView(
- DataflowRunner runner,
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
- this.transform = transform;
- }
-
+ static class StreamingCreatePCollectionViewFactory<ElemT, ViewT>
+ extends SingleInputOutputOverrideFactory<
+ PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
@Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined =
- input.apply(Combine.<InputT, OutputT>globally(transform.getCombineFn())
- .withoutDefaults()
- .withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined.getPipeline(),
- combined.getWindowingStrategy(),
- transform.getInsertDefault(),
- transform.getInsertDefault()
- ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined
- .apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, combined.getCoder())))
- .apply(View.CreatePCollectionView.<OutputT, OutputT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
- }
- }
-
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Arrays.asList(c.element()));
+ public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
+ final CreatePCollectionView<ElemT, ViewT> transform) {
+ return new StreamingCreatePCollectionView<>(transform.getView());
}
- }
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
- * for the Dataflow runner in streaming mode.
- */
- static class StreamingViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
- private final DataflowRunner runner;
+ private static class StreamingCreatePCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ private final PCollectionView<ViewT> view;
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingViewAsMap(DataflowRunner runner, View.AsMap<K, V> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
+ private StreamingCreatePCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
}
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
- .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
- }
-
- /**
- * Specialized expansion for {@link
- * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
- * Dataflow runner in streaming mode.
- */
- static class StreamingViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
- private final DataflowRunner runner;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingViewAsMultimap(DataflowRunner runner, View.AsMultimap<K, V> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+ return input
+ .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
+ .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
+ .apply(View.CreatePCollectionView.<ElemT, ViewT>of(view));
}
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
- .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
}
}
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
- * Dataflow runner in streaming mode.
- */
- static class StreamingViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingViewAsList(DataflowRunner runner, View.AsList<T> transform) {}
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
- .apply(View.CreatePCollectionView.<T, List<T>>of(view));
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
}
@Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
- * Dataflow runner in streaming mode.
- */
- static class StreamingViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) { }
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
- .apply(View.CreatePCollectionView.<T, Iterable<T>>of(view));
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException(
+ "PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
}
@Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
- }
-
- /**
- * Specialized expansion for
- * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} for the
- * Dataflow runner in streaming mode.
- */
- static class StreamingViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
- private View.AsSingleton<T> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public StreamingViewAsSingleton(DataflowRunner runner, View.AsSingleton<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine.globally(
- new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
-
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
}
}
}
@@ -306,11 +98,6 @@ class StreamingViewOverrides {
/**
* Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
*
- * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
- * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
* @param <T> the type of elements to concatenate.
*/
private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> {
[2/4] beam git commit: Remove SingletonCombine
Posted by tg...@apache.org.
Remove SingletonCombine
It is unused with the update to Streaming View Overrides.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/079966ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/079966ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/079966ca
Branch: refs/heads/master
Commit: 079966cad99442c63e0f3147a6361139bd601c8c
Parents: 7b94c99
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 10:57:50 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800
----------------------------------------------------------------------
.../dataflow/StreamingViewOverrides.java | 29 --------------------
1 file changed, 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/079966ca/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 8e005cf..5f0cb26 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -66,35 +66,6 @@ class StreamingViewOverrides {
}
}
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException(
- "PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
-
/**
* Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
*
[3/4] beam git commit: Add a Test for windowed
CombineGloballyAsSingletonView
Posted by tg...@apache.org.
Add a Test for windowed CombineGloballyAsSingletonView
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de44a34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de44a34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de44a34
Branch: refs/heads/master
Commit: 3de44a348e3e0934c644c718255a43b8f42a3534
Parents: 079966c
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 11:24:14 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 46 ++++++++++++++++++++
1 file changed, 46 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3de44a34/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 5b18384..6c62d0b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
@@ -75,8 +76,10 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -627,6 +630,49 @@ public class CombineTest implements Serializable {
}
@Test
+ @Category(RunnableOnService.class)
+ public void testWindowedCombineGloballyAsSingletonView() {
+ FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
+ final PCollectionView<Integer> view =
+ pipeline
+ .apply(
+ "CreateSideInput",
+ Create.timestamped(
+ TimestampedValue.of(1, new Instant(100)),
+ TimestampedValue.of(3, new Instant(100))))
+ .apply("WindowSideInput", Window.<Integer>into(windowFn))
+ .apply("CombineSideInput", Sum.integersGlobally().asSingletonView());
+
+ TimestampedValue<Void> nonEmptyElement = TimestampedValue.of(null, new Instant(100));
+ TimestampedValue<Void> emptyElement = TimestampedValue.atMinimumTimestamp(null);
+ PCollection<Integer> output =
+ pipeline
+ .apply(
+ "CreateMainInput",
+ Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
+ .apply("WindowMainInput", Window.<Void>into(windowFn))
+ .apply(
+ "OutputSideInput",
+ ParDo.of(
+ new DoFn<Void, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.sideInput(view));
+ }
+ })
+ .withSideInputs(view));
+
+ PAssert.that(output).containsInAnyOrder(4, 0);
+ PAssert.that(output)
+ .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp()))
+ .containsInAnyOrder(4);
+ PAssert.that(output)
+ .inWindow(windowFn.assignWindow(emptyElement.getTimestamp()))
+ .containsInAnyOrder(0);
+ pipeline.run();
+ }
+
+ @Test
public void testCombineGetName() {
assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
assertEquals(