You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:11 UTC
[09/50] beam git commit: Add utility to expand list of
PCollectionViews
Add utility to expand list of PCollectionViews
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58fba590
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58fba590
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58fba590
Branch: refs/heads/gearpump-runner
Commit: 58fba590ddc554a343036a7beeffe9caa319aa81
Parents: e5929bd
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 27 14:35:00 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 27 20:45:40 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/values/PCollectionViews.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/58fba590/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 0c04370..e17e146 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import java.io.IOException;
@@ -38,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -139,6 +141,18 @@ public class PCollectionViews {
}
/**
+ * Expands a list of {@link PCollectionView} into the form needed for
+ * {@link PTransform#getAdditionalInputs()}.
+ */
+ public static Map<TupleTag<?>, PValue> toAdditionalInputs(Iterable<PCollectionView<?>> views) {
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> view : views) {
+ additionalInputs.put(view.getTagInternal(), view.getPCollection());
+ }
+ return additionalInputs.build();
+ }
+
+ /**
* Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
*
* <p>For internal use only.