You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/28 19:46:44 UTC

[1/2] beam git commit: This closes #2338

Repository: beam
Updated Branches:
  refs/heads/master 0582eb9fc -> caba84171


This closes #2338


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caba8417
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caba8417
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caba8417

Branch: refs/heads/master
Commit: caba841715304458b6b93ccf4e96c67a0e949bdf
Parents: 0582eb9 e6754eb
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 28 12:46:33 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 28 12:46:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 31 ++++++++++++++++++++
 .../apache/beam/sdk/transforms/PTransform.java  | 14 +++++++++
 .../org/apache/beam/sdk/transforms/ParDo.java   | 31 ++++++++++++++++++++
 3 files changed, 76 insertions(+)
----------------------------------------------------------------------



[2/2] beam git commit: Add PTransform#getAdditionalInputs()

Posted by tg...@apache.org.
Add PTransform#getAdditionalInputs()

This permits all PTransforms to expose any input PValues they recieve
via a side channel. This lets the Pipeline include those inputs as input
to the PTransform node even when the actual generic PTransform types do
not include those inputs (e.g. as a Side Input to a ParDo, which is not
represented in the input PCollection type).

Implement getAdditionalInputs in ParDo and Combine.

getAdditionalInputs is currently unused.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6754ebd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6754ebd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6754ebd

Branch: refs/heads/master
Commit: e6754ebdce30a345127068b8b069903515145a39
Parents: 0582eb9
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 22 13:48:57 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 28 12:46:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 31 ++++++++++++++++++++
 .../apache/beam/sdk/transforms/PTransform.java  | 14 +++++++++
 .../org/apache/beam/sdk/transforms/ParDo.java   | 31 ++++++++++++++++++++
 3 files changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e6754ebd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 7295f63..2c145b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -70,6 +72,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -1433,6 +1436,20 @@ public class Combine {
     }
 
     /**
+     * Returns the side inputs of this {@link Combine}, tagged with the tag of the
+     * {@link PCollectionView}. The values of the returned map will be equal to the result of
+     * {@link #getSideInputs()}.
+     */
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput);
+      }
+      return additionalInputs.build();
+    }
+
+    /**
      * Returns whether or not this transformation applies a default value.
      */
     public boolean isInsertDefault() {
@@ -1874,6 +1891,20 @@ public class Combine {
       return sideInputs;
     }
 
+    /**
+     * Returns the side inputs of this {@link Combine}, tagged with the tag of the
+     * {@link PCollectionView}. The values of the returned map will be equal to the result of
+     * {@link #getSideInputs()}.
+     */
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput);
+      }
+      return additionalInputs.build();
+    }
+
     @Override
     public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
       return input

http://git-wip-us.apache.org/repos/asf/beam/blob/e6754ebd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 27bb219..687938d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.transforms;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -28,6 +30,8 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypedPValue;
 
 /**
@@ -192,6 +196,16 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
   public void validate(InputT input) {}
 
   /**
+   * Returns all {@link PValue PValues} that are consumed as inputs to this {@link PTransform} that
+   * are independent of the expansion of the {@link InputT} within {@link #expand(PInput)}.
+   *
+   * <p>For example, this can contain any side input consumed by this {@link PTransform}.
+   */
+  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+    return Collections.emptyMap();
+  }
+
+  /**
    * Returns the transform name.
    *
    * <p>This name is provided by the transform creator and is not required to be unique.

http://git-wip-us.apache.org/repos/asf/beam/blob/e6754ebd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 76c06b6..7446737 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -20,9 +20,11 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -41,6 +43,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -778,6 +781,20 @@ public class ParDo {
     public List<PCollectionView<?>> getSideInputs() {
       return sideInputs;
     }
+
+    /**
+     * Returns the side inputs of this {@link ParDo}, tagged with the tag of the
+     * {@link PCollectionView}. The values of the returned map will be equal to the result of
+     * {@link #getSideInputs()}.
+     */
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput);
+      }
+      return additionalInputs.build();
+    }
   }
 
   /**
@@ -985,6 +1002,20 @@ public class ParDo {
     public List<PCollectionView<?>> getSideInputs() {
       return sideInputs;
     }
+
+    /**
+     * Returns the side inputs of this {@link ParDo}, tagged with the tag of the
+     * {@link PCollectionView}. The values of the returned map will be equal to the result of
+     * {@link #getSideInputs()}.
+     */
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput);
+      }
+      return additionalInputs.build();
+    }
   }
 
   private static void populateDisplayData(