You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/28 19:46:44 UTC
[1/2] beam git commit: This closes #2338
Repository: beam
Updated Branches:
refs/heads/master 0582eb9fc -> caba84171
This closes #2338
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caba8417
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caba8417
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caba8417
Branch: refs/heads/master
Commit: caba841715304458b6b93ccf4e96c67a0e949bdf
Parents: 0582eb9 e6754eb
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 28 12:46:33 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 28 12:46:33 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Combine.java | 31 ++++++++++++++++++++
.../apache/beam/sdk/transforms/PTransform.java | 14 +++++++++
.../org/apache/beam/sdk/transforms/ParDo.java | 31 ++++++++++++++++++++
3 files changed, 76 insertions(+)
----------------------------------------------------------------------
[2/2] beam git commit: Add PTransform#getAdditionalInputs()
Posted by tg...@apache.org.
Add PTransform#getAdditionalInputs()
This permits all PTransforms to expose any input PValues they recieve
via a side channel. This lets the Pipeline include those inputs as input
to the PTransform node even when the actual generic PTransform types do
not include those inputs (e.g. as a Side Input to a ParDo, which is not
represented in the input PCollection type).
Implement getAdditionalInputs in ParDo and Combine.
getAdditionalInputs is currently unused.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6754ebd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6754ebd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6754ebd
Branch: refs/heads/master
Commit: e6754ebdce30a345127068b8b069903515145a39
Parents: 0582eb9
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 22 13:48:57 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 28 12:46:33 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Combine.java | 31 ++++++++++++++++++++
.../apache/beam/sdk/transforms/PTransform.java | 14 +++++++++
.../org/apache/beam/sdk/transforms/ParDo.java | 31 ++++++++++++++++++++
3 files changed, 76 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e6754ebd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 7295f63..2c145b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
@@ -30,6 +31,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -70,6 +72,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -1433,6 +1436,20 @@ public class Combine {
}
/**
+ * Returns the side inputs of this {@link Combine}, tagged with the tag of the
+ * {@link PCollectionView}. The values of the returned map will be equal to the result of
+ * {@link #getSideInputs()}.
+ */
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput);
+ }
+ return additionalInputs.build();
+ }
+
+ /**
* Returns whether or not this transformation applies a default value.
*/
public boolean isInsertDefault() {
@@ -1874,6 +1891,20 @@ public class Combine {
return sideInputs;
}
+ /**
+ * Returns the side inputs of this {@link Combine}, tagged with the tag of the
+ * {@link PCollectionView}. The values of the returned map will be equal to the result of
+ * {@link #getSideInputs()}.
+ */
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput);
+ }
+ return additionalInputs.build();
+ }
+
@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
return input
http://git-wip-us.apache.org/repos/asf/beam/blob/e6754ebd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 27bb219..687938d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.transforms;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -28,6 +30,8 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypedPValue;
/**
@@ -192,6 +196,16 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
public void validate(InputT input) {}
/**
+ * Returns all {@link PValue PValues} that are consumed as inputs to this {@link PTransform} that
+ * are independent of the expansion of the {@link InputT} within {@link #expand(PInput)}.
+ *
+ * <p>For example, this can contain any side input consumed by this {@link PTransform}.
+ */
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return Collections.emptyMap();
+ }
+
+ /**
* Returns the transform name.
*
* <p>This name is provided by the transform creator and is not required to be unique.
http://git-wip-us.apache.org/repos/asf/beam/blob/e6754ebd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 76c06b6..7446737 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -20,9 +20,11 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -41,6 +43,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -778,6 +781,20 @@ public class ParDo {
public List<PCollectionView<?>> getSideInputs() {
return sideInputs;
}
+
+ /**
+ * Returns the side inputs of this {@link ParDo}, tagged with the tag of the
+ * {@link PCollectionView}. The values of the returned map will be equal to the result of
+ * {@link #getSideInputs()}.
+ */
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput);
+ }
+ return additionalInputs.build();
+ }
}
/**
@@ -985,6 +1002,20 @@ public class ParDo {
public List<PCollectionView<?>> getSideInputs() {
return sideInputs;
}
+
+ /**
+ * Returns the side inputs of this {@link ParDo}, tagged with the tag of the
+ * {@link PCollectionView}. The values of the returned map will be equal to the result of
+ * {@link #getSideInputs()}.
+ */
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput);
+ }
+ return additionalInputs.build();
+ }
}
private static void populateDisplayData(