You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:50 UTC

[beam] 28/50: Implement read transform

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4f150da0d1b01cd8a2a0d52142e3f441d40004df
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 3 09:28:11 2018 +0100

    Implement read transform
---
 .../translation/TranslationContext.java            |  19 +++
 .../batch/ReadSourceTranslatorBatch.java           |  32 +++-
 .../translation/io/DatasetSource.java              | 163 +++++++++++++++++++++
 3 files changed, 213 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index e66bc90..52ed11f 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,11 +33,18 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.streaming.Source;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.streaming.StreamingQueryException;
 
 /**
@@ -73,6 +80,14 @@ public class TranslationContext {
     this.leaves = new HashSet<>();
   }
 
+  public SparkSession getSparkSession() {
+    return sparkSession;
+  }
+
+  public SparkPipelineOptions getOptions() {
+    return options;
+  }
+
   // --------------------------------------------------------------------------------------------
   //  Transforms methods
   // --------------------------------------------------------------------------------------------
@@ -80,6 +95,10 @@ public class TranslationContext {
     this.currentTransform = currentTransform;
   }
 
+  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return currentTransform;
+  }
+
   // --------------------------------------------------------------------------------------------
   //  Datasets methods
   // --------------------------------------------------------------------------------------------
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index d18eb2e..05dc374 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -17,16 +17,46 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.io.IOException;
+import org.apache.beam.runners.core.construction.ReadTranslation;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
 
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translateTransform(
-      PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {}
+      PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
+    AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform =
+        (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
+            context.getCurrentTransform();
+    BoundedSource<T> source;
+    try {
+      source = ReadTranslation.boundedSourceFromTransform(rootTransform);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+
+    SparkSession sparkSession = context.getSparkSession();
+    DatasetSource datasetSource = new DatasetSource(context, source);
+    Dataset<Row> dataset = sparkSession.readStream().format("DatasetSource").load();
+
+    context.putDataset(output, dataset);
+  }
+
+
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
new file mode 100644
index 0000000..d9d283e
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -0,0 +1,163 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static scala.collection.JavaConversions.asScalaBuffer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
+ * This class is just a mix-in.
+ */
+public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
+
+  private final int numPartitions;
+  private final Long bundleSize;
+  private TranslationContext context;
+  private BoundedSource<T> source;
+
+  public DatasetSource(TranslationContext context, BoundedSource<T> source) {
+    this.context = context;
+    this.source = source;
+    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+    this.bundleSize = context.getOptions().getBundleSize();
+
+  }
+
+  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
+      String checkpointLocation, DataSourceOptions options) {
+    return new DatasetMicroBatchReader(schema, checkpointLocation, options);
+  }
+
+  /**
+   * This class can be mapped to Beam {@link BoundedSource}.
+   */
+  private class DatasetMicroBatchReader implements MicroBatchReader {
+
+    private Optional<StructType> schema;
+    private String checkpointLocation;
+    private DataSourceOptions options;
+
+    private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
+        DataSourceOptions options) {
+      //TODO start reading from the source here, inc offset at each element read
+    }
+
+    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+      //TODO extension point for SDF
+    }
+
+    @Override public Offset getStartOffset() {
+      //TODO extension point for SDF
+      return null;
+    }
+
+    @Override public Offset getEndOffset() {
+      //TODO extension point for SDF
+      return null;
+    }
+
+    @Override public Offset deserializeOffset(String json) {
+      //TODO extension point for SDF
+      return null;
+    }
+
+    @Override public void commit(Offset end) {
+      //TODO no more to read after end Offset
+    }
+
+    @Override public void stop() {
+    }
+
+    @Override public StructType readSchema() {
+      return null;
+    }
+
+    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+      List<InputPartition<InternalRow>> result = new ArrayList<>();
+      long desiredSizeBytes;
+      SparkPipelineOptions options = context.getOptions();
+      try {
+        desiredSizeBytes = (bundleSize == null) ?
+            source.getEstimatedSizeBytes(options) / numPartitions :
+            bundleSize;
+        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
+        for (BoundedSource<T> source : sources) {
+          result.add(new InputPartition<InternalRow>() {
+
+            @Override public InputPartitionReader<InternalRow> createPartitionReader() {
+              BoundedReader<T> reader = null;
+              try {
+                reader = source.createReader(options);
+              } catch (IOException e) {
+              }
+              return new DatasetMicroBatchPartitionReader(reader);
+            }
+          });
+        }
+        return result;
+
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      return result;
+    }
+
+  }
+
+  /**
+   * This class can be mapped to Beam {@link BoundedReader}
+   */
+  private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
+
+    BoundedReader<T> reader;
+    private boolean started;
+    private boolean closed;
+
+    DatasetMicroBatchPartitionReader(BoundedReader<T> reader) {
+      this.reader = reader;
+      this.started = false;
+      this.closed = false;
+    }
+
+    @Override public boolean next() throws IOException {
+      if (!started) {
+        started = true;
+        return reader.start();
+      } else {
+        return !closed && reader.advance();
+      }
+    }
+
+    @Override public InternalRow get() {
+      List<Object> list = new ArrayList<>();
+      list.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
+      return InternalRow.apply(asScalaBuffer(list).toList());
+    }
+
+    @Override public void close() throws IOException {
+      closed = true;
+      reader.close();
+    }
+  }
+}