You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:11 UTC

[09/50] beam git commit: Add utility to expand list of PCollectionViews

Add utility to expand list of PCollectionViews


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58fba590
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58fba590
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58fba590

Branch: refs/heads/gearpump-runner
Commit: 58fba590ddc554a343036a7beeffe9caa319aa81
Parents: e5929bd
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 27 14:35:00 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 27 20:45:40 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/values/PCollectionViews.java  | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/58fba590/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 0c04370..e17e146 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import java.io.IOException;
@@ -38,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -139,6 +141,18 @@ public class PCollectionViews {
   }
 
   /**
+   * Expands a list of {@link PCollectionView} into the form needed for
+   * {@link PTransform#getAdditionalInputs()}.
+   */
+  public static Map<TupleTag<?>, PValue> toAdditionalInputs(Iterable<PCollectionView<?>> views) {
+    ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+    for (PCollectionView<?> view : views) {
+      additionalInputs.put(view.getTagInternal(), view.getPCollection());
+    }
+    return additionalInputs.build();
+  }
+
+  /**
    * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
    *
    * <p>For internal use only.