You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:47 UTC
[beam] 25/50: Add Flatten transformation translator
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 286d7f36480d79ad54f2e92f0b8af8c4ba716621
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 29 16:02:11 2018 +0100
Add Flatten transformation translator
---
.../translation/TranslationContext.java | 4 +++
...latorBatch.java => FlattenTranslatorBatch.java} | 35 ++++++++++++++++++++--
.../translation/batch/PipelineTranslatorBatch.java | 2 +-
3 files changed, 38 insertions(+), 3 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 98f77af..3c29867 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -83,6 +83,10 @@ public class TranslationContext {
// --------------------------------------------------------------------------------------------
// Datasets methods
// --------------------------------------------------------------------------------------------
+ @SuppressWarnings("unchecked")
+ public <T> Dataset<T> emptyDataset() {
+ return (Dataset<T>) sparkSession.emptyDataset(Encoders.bean(Void.class));
+ }
@SuppressWarnings("unchecked")
public <T> Dataset<WindowedValue<T>> getDataset(PValue value) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
similarity index 55%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
index 87a250e..2739e83 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
@@ -17,16 +17,47 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.sql.Dataset;
-class FlattenPCollectionTranslatorBatch<T>
+class FlattenTranslatorBatch<T>
implements TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
@Override
public void translateTransform(
- PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {}
+ PTransform<PCollectionList<T>, PCollection<T>> transform, TranslationContext context) {
+ Map<TupleTag<?>, PValue> inputs = context.getInputs();
+ Dataset<WindowedValue<T>> result = null;
+
+ if (inputs.isEmpty()) {
+ result = context.emptyDataset();
+ } else {
+ for (PValue pValue : inputs.values()) {
+ checkArgument(
+ pValue instanceof PCollection,
+ "Got non-PCollection input to flatten: %s of type %s",
+ pValue,
+ pValue.getClass().getSimpleName());
+ @SuppressWarnings("unchecked")
+ PCollection<T> pCollection = (PCollection<T>) pValue;
+ Dataset<WindowedValue<T>> current = context.getDataset(pCollection);
+ if (result == null) {
+ result = current;
+ } else {
+ result = result.union(current);
+ }
+ }
+ }
+ context.putDataset(context.getOutput(), result);
+ }
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 318d74c..26f1b9c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -56,7 +56,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
TRANSFORM_TRANSLATORS.put(
- PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
+ PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenTranslatorBatch());
TRANSFORM_TRANSLATORS.put(
PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());