You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/03 22:59:26 UTC

[1/4] beam git commit: This closes #2152

Repository: beam
Updated Branches:
  refs/heads/master 178381992 -> 7e9233bbd


This closes #2152


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e9233bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e9233bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e9233bb

Branch: refs/heads/master
Commit: 7e9233bbd19546831cd76eddc33b51e0d4360f00
Parents: 1783819 3de44a3
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 14:59:12 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  92 +++---
 .../dataflow/StreamingViewOverrides.java        | 278 ++-----------------
 .../apache/beam/sdk/transforms/CombineTest.java |  46 +++
 3 files changed, 103 insertions(+), 313 deletions(-)
----------------------------------------------------------------------



[4/4] beam git commit: Only Override CreatePCollectionView in Streaming

Posted by tg...@apache.org.
Only Override CreatePCollectionView in Streaming

This permits us to use the appropriate view token for the
StreamingPCollectionViewWriterFn.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b94c99b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b94c99b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b94c99b

Branch: refs/heads/master
Commit: 7b94c99be43d82bcab9370f63c0d63646146ca97
Parents: 1783819
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 10:56:29 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  92 +++---
 .../dataflow/StreamingViewOverrides.java        | 287 +++----------------
 2 files changed, 76 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 50b6b4f..c609b54 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
+import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -75,6 +76,8 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
@@ -94,18 +97,12 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.View.AsIterable;
-import org.apache.beam.sdk.transforms.View.AsList;
-import org.apache.beam.sdk.transforms.View.AsMap;
-import org.apache.beam.sdk.transforms.View.AsMultimap;
-import org.apache.beam.sdk.transforms.View.AsSingleton;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -326,29 +323,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               PTransformMatchers.classEqualTo(Read.Unbounded.class),
               new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))
           .put(
-              PTransformMatchers.classEqualTo(GloballyAsSingletonView.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsMap.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsMap.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsMultimap.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsMultimap.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsSingleton.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsSingleton.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsList.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsList.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsIterable.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsIterable.class, this));
+              PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+              new StreamingCreatePCollectionViewFactory());
     } else {
       // In batch mode must use the custom Pubsub bounded source/sink.
       for (Class<? extends PTransform> unsupported :
@@ -719,30 +695,40 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     // have just recorded the full names during apply time.
     if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
       final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
-      pipeline.traverseTopologically(new PipelineVisitor() {
-        @Override
-        public void visitValue(PValue value, TransformHierarchy.Node producer) {
-        }
-
-        @Override
-        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
-            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
-          }
-        }
-
-        @Override
-        public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
-            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
-          }
-          return CompositeBehavior.ENTER_TRANSFORM;
-        }
-
-        @Override
-        public void leaveCompositeTransform(TransformHierarchy.Node node) {
-        }
-      });
+      pipeline.traverseTopologically(
+          new PipelineVisitor() {
+            @Override
+            public void visitValue(PValue value, TransformHierarchy.Node producer) {}
+
+            @Override
+            public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+              if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+                ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+              }
+            }
+
+            @Override
+            public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+              if (node.getTransform() instanceof View.AsMap
+                  || node.getTransform() instanceof View.AsMultimap) {
+                PCollection<KV<?, ?>> input =
+                    (PCollection<KV<?, ?>>) Iterables.getOnlyElement(node.getInputs()).getValue();
+                KvCoder<?, ?> inputCoder = (KvCoder) input.getCoder();
+                try {
+                  inputCoder.getKeyCoder().verifyDeterministic();
+                } catch (NonDeterministicException e) {
+                  ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+                }
+              }
+              if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+                ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+              }
+              return CompositeBehavior.ENTER_TRANSFORM;
+            }
+
+            @Override
+            public void leaveCompositeTransform(TransformHierarchy.Node node) {}
+          });
 
       LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
           + "because the key coder is not deterministic. Falling back to singleton implementation "

http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index bab115f..8e005cf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -19,23 +19,18 @@
 package org.apache.beam.runners.dataflow;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.dataflow.DataflowRunner.StreamingPCollectionViewWriterFn;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 
@@ -44,261 +39,58 @@ import org.apache.beam.sdk.values.PCollectionView;
  * types.
  */
 class StreamingViewOverrides {
-  static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingCombineGloballyAsSingletonView(
-        DataflowRunner runner,
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
+  static class StreamingCreatePCollectionViewFactory<ElemT, ViewT>
+      extends SingleInputOutputOverrideFactory<
+          PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
     @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.<InputT, OutputT>globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined.getPipeline(),
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, combined.getCoder())))
-          .apply(View.CreatePCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Arrays.asList(c.element()));
+    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
+        final CreatePCollectionView<ElemT, ViewT> transform) {
+      return new StreamingCreatePCollectionView<>(transform.getView());
     }
-  }
 
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-    private final DataflowRunner runner;
+    private static class StreamingCreatePCollectionView<ElemT, ViewT>
+        extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+      private final PCollectionView<ViewT> view;
 
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingViewAsMap(DataflowRunner runner, View.AsMap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      private StreamingCreatePCollectionView(PCollectionView<ViewT> view) {
+        this.view = view;
       }
 
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
-          .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-    private final DataflowRunner runner;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingViewAsMultimap(DataflowRunner runner, View.AsMultimap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      @Override
+      public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+        return input
+            .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
+            .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
+            .apply(View.CreatePCollectionView.<ElemT, ViewT>of(view));
       }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
-          .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
     }
   }
 
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingViewAsList(DataflowRunner runner, View.AsList<T> transform) {}
+  private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+    private boolean hasDefaultValue;
+    private T defaultValue;
 
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
-          .apply(View.CreatePCollectionView.<T, List<T>>of(view));
+    SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+      this.hasDefaultValue = hasDefaultValue;
+      this.defaultValue = defaultValue;
     }
 
     @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) { }
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
-          .apply(View.CreatePCollectionView.<T, Iterable<T>>of(view));
+    public T apply(T left, T right) {
+      throw new IllegalArgumentException(
+          "PCollection with more than one element "
+              + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+              + "combine the PCollection into a single value");
     }
 
     @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Specialized expansion for
-   * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private View.AsSingleton<T> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public StreamingViewAsSingleton(DataflowRunner runner, View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
+    public T identity() {
+      if (hasDefaultValue) {
+        return defaultValue;
+      } else {
+        throw new IllegalArgumentException(
+            "Empty PCollection accessed as a singleton view. "
+                + "Consider setting withDefault to provide a default value");
       }
     }
   }
@@ -306,11 +98,6 @@ class StreamingViewOverrides {
   /**
    * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
    *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
    * @param <T> the type of elements to concatenate.
    */
   private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> {


[2/4] beam git commit: Remove SingletonCombine

Posted by tg...@apache.org.
Remove SingletonCombine

It is unused with the update to Streaming View Overrides.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/079966ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/079966ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/079966ca

Branch: refs/heads/master
Commit: 079966cad99442c63e0f3147a6361139bd601c8c
Parents: 7b94c99
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 10:57:50 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800

----------------------------------------------------------------------
 .../dataflow/StreamingViewOverrides.java        | 29 --------------------
 1 file changed, 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/079966ca/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 8e005cf..5f0cb26 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -66,35 +66,6 @@ class StreamingViewOverrides {
     }
   }
 
-  private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-    private boolean hasDefaultValue;
-    private T defaultValue;
-
-    SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-      this.hasDefaultValue = hasDefaultValue;
-      this.defaultValue = defaultValue;
-    }
-
-    @Override
-    public T apply(T left, T right) {
-      throw new IllegalArgumentException(
-          "PCollection with more than one element "
-              + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-              + "combine the PCollection into a single value");
-    }
-
-    @Override
-    public T identity() {
-      if (hasDefaultValue) {
-        return defaultValue;
-      } else {
-        throw new IllegalArgumentException(
-            "Empty PCollection accessed as a singleton view. "
-                + "Consider setting withDefault to provide a default value");
-      }
-    }
-  }
-
   /**
    * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
    *


[3/4] beam git commit: Add a Test for windowed CombineGloballyAsSingletonView

Posted by tg...@apache.org.
Add a Test for windowed CombineGloballyAsSingletonView


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de44a34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de44a34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de44a34

Branch: refs/heads/master
Commit: 3de44a348e3e0934c644c718255a43b8f42a3534
Parents: 079966c
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 11:24:14 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 46 ++++++++++++++++++++
 1 file changed, 46 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3de44a34/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 5b18384..6c62d0b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -75,8 +76,10 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -627,6 +630,49 @@ public class CombineTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedCombineGloballyAsSingletonView() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
+    final PCollectionView<Integer> view =
+        pipeline
+            .apply(
+                "CreateSideInput",
+                Create.timestamped(
+                    TimestampedValue.of(1, new Instant(100)),
+                    TimestampedValue.of(3, new Instant(100))))
+            .apply("WindowSideInput", Window.<Integer>into(windowFn))
+            .apply("CombineSideInput", Sum.integersGlobally().asSingletonView());
+
+    TimestampedValue<Void> nonEmptyElement = TimestampedValue.of(null, new Instant(100));
+    TimestampedValue<Void> emptyElement = TimestampedValue.atMinimumTimestamp(null);
+    PCollection<Integer> output =
+        pipeline
+            .apply(
+                "CreateMainInput",
+                Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
+            .apply("WindowMainInput", Window.<Void>into(windowFn))
+            .apply(
+                "OutputSideInput",
+                ParDo.of(
+                        new DoFn<Void, Integer>() {
+                          @ProcessElement
+                          public void processElement(ProcessContext c) {
+                            c.output(c.sideInput(view));
+                          }
+                        })
+                    .withSideInputs(view));
+
+    PAssert.that(output).containsInAnyOrder(4, 0);
+    PAssert.that(output)
+        .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp()))
+        .containsInAnyOrder(4);
+    PAssert.that(output)
+        .inWindow(windowFn.assignWindow(emptyElement.getTimestamp()))
+        .containsInAnyOrder(0);
+    pipeline.run();
+  }
+
+  @Test
   public void testCombineGetName() {
     assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
     assertEquals(