You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/29 15:13:13 UTC

[beam] 01/03: Create PCollections manipulation methods

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 26238ce88d2cdca169587ac55547c146f267a39b
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 29 11:48:20 2018 +0100

    Create PCollections manipulation methods
---
 .../translation/TranslationContext.java            | 56 +++++++++++++++++++++-
 1 file changed, 55 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 71ae276..a3276bf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -17,17 +17,25 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
+import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.ForeachWriter;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.streaming.StreamingQueryException;
@@ -40,6 +48,7 @@ public class TranslationContext {
 
   private final Map<PValue, Dataset<?>> datasets;
   private final Set<Dataset<?>> leaves;
+
   private final SparkPipelineOptions options;
 
   @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
@@ -62,10 +71,55 @@ public class TranslationContext {
     this.leaves = new LinkedHashSet<>();
   }
 
+  // --------------------------------------------------------------------------------------------
+  //  Transforms methods
+  // --------------------------------------------------------------------------------------------
   public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
     this.currentTransform = currentTransform;
   }
 
+  // --------------------------------------------------------------------------------------------
+  //  Datasets methods
+  // --------------------------------------------------------------------------------------------
+
+
+  // --------------------------------------------------------------------------------------------
+  //  PCollections methods
+  // --------------------------------------------------------------------------------------------
+  @SuppressWarnings("unchecked")
+  public PValue getInput() {
+    return Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<TupleTag<?>, PValue> getInputs() {
+    return currentTransform.getInputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  public PValue getOutput() {
+    return Iterables.getOnlyElement(currentTransform.getOutputs().values());
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<TupleTag<?>, PValue> getOutputs() {
+    return currentTransform.getOutputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
+    return currentTransform
+        .getOutputs()
+        .entrySet()
+        .stream()
+        .filter(e -> e.getValue() instanceof PCollection)
+        .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline methods
+  // --------------------------------------------------------------------------------------------
+
   public void startPipeline() {
     try {
       // to start a pipeline we need a DatastreamWriter to start