You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:45 UTC
[beam] 23/50: Create PCollections manipulation methods
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7a645e1bd44f95cae5108c63d5f49f555c91f7d6
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 29 11:48:20 2018 +0100
Create PCollections manipulation methods
---
.../translation/TranslationContext.java | 56 +++++++++++++++++++++-
1 file changed, 55 insertions(+), 1 deletion(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 71ae276..a3276bf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -17,17 +17,25 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation;
+import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
-import java.util.LinkedHashSet;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
@@ -40,6 +48,7 @@ public class TranslationContext {
private final Map<PValue, Dataset<?>> datasets;
private final Set<Dataset<?>> leaves;
+
private final SparkPipelineOptions options;
@SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
@@ -62,10 +71,55 @@ public class TranslationContext {
this.leaves = new LinkedHashSet<>();
}
+ // --------------------------------------------------------------------------------------------
+ // Transforms methods
+ // --------------------------------------------------------------------------------------------
public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
this.currentTransform = currentTransform;
}
+ // --------------------------------------------------------------------------------------------
+ // Datasets methods
+ // --------------------------------------------------------------------------------------------
+
+
+ // --------------------------------------------------------------------------------------------
+ // PCollections methods
+ // --------------------------------------------------------------------------------------------
+ @SuppressWarnings("unchecked")
+ public PValue getInput() {
+ return Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<TupleTag<?>, PValue> getInputs() {
+ return currentTransform.getInputs();
+ }
+
+ @SuppressWarnings("unchecked")
+ public PValue getOutput() {
+ return Iterables.getOnlyElement(currentTransform.getOutputs().values());
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<TupleTag<?>, PValue> getOutputs() {
+ return currentTransform.getOutputs();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
+ return currentTransform
+ .getOutputs()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue() instanceof PCollection)
+ .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline methods
+ // --------------------------------------------------------------------------------------------
+
public void startPipeline() {
try {
// to start a pipeline we need a DatastreamWriter to start