You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:50 UTC
[beam] 28/50: Implement read transform
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 4f150da0d1b01cd8a2a0d52142e3f441d40004df
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 3 09:28:11 2018 +0100
Implement read transform
---
.../translation/TranslationContext.java | 19 +++
.../batch/ReadSourceTranslatorBatch.java | 32 +++-
.../translation/io/DatasetSource.java | 163 +++++++++++++++++++++
3 files changed, 213 insertions(+), 1 deletion(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index e66bc90..52ed11f 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,11 +33,18 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.streaming.Source;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.streaming.StreamingQueryException;
/**
@@ -73,6 +80,14 @@ public class TranslationContext {
this.leaves = new HashSet<>();
}
+ public SparkSession getSparkSession() {
+ return sparkSession;
+ }
+
+ public SparkPipelineOptions getOptions() {
+ return options;
+ }
+
// --------------------------------------------------------------------------------------------
// Transforms methods
// --------------------------------------------------------------------------------------------
@@ -80,6 +95,10 @@ public class TranslationContext {
this.currentTransform = currentTransform;
}
+ public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+ return currentTransform;
+ }
+
// --------------------------------------------------------------------------------------------
// Datasets methods
// --------------------------------------------------------------------------------------------
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index d18eb2e..05dc374 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -17,16 +17,46 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import java.io.IOException;
+import org.apache.beam.runners.core.construction.ReadTranslation;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
class ReadSourceTranslatorBatch<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
+ @SuppressWarnings("unchecked")
@Override
public void translateTransform(
- PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {}
+ PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform =
+ (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
+ context.getCurrentTransform();
+ BoundedSource<T> source;
+ try {
+ source = ReadTranslation.boundedSourceFromTransform(rootTransform);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ PCollection<T> output = (PCollection<T>) context.getOutput();
+
+ SparkSession sparkSession = context.getSparkSession();
+ DatasetSource datasetSource = new DatasetSource(context, source);
+ Dataset<Row> dataset = sparkSession.readStream().format("DatasetSource").load();
+
+ context.putDataset(output, dataset);
+ }
+
+
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
new file mode 100644
index 0000000..d9d283e
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -0,0 +1,163 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static scala.collection.JavaConversions.asScalaBuffer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
+ * This class is just a mix-in.
+ */
+public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
+
+ private final int numPartitions;
+ private final Long bundleSize;
+ private TranslationContext context;
+ private BoundedSource<T> source;
+
+ public DatasetSource(TranslationContext context, BoundedSource<T> source) {
+ this.context = context;
+ this.source = source;
+ this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+ checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+ this.bundleSize = context.getOptions().getBundleSize();
+
+ }
+
+ @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
+ String checkpointLocation, DataSourceOptions options) {
+ return new DatasetMicroBatchReader(schema, checkpointLocation, options);
+ }
+
+ /**
+ * This class can be mapped to Beam {@link BoundedSource}.
+ */
+ private class DatasetMicroBatchReader implements MicroBatchReader {
+
+ private Optional<StructType> schema;
+ private String checkpointLocation;
+ private DataSourceOptions options;
+
+ private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
+ DataSourceOptions options) {
+ //TODO start reading from the source here, inc offset at each element read
+ }
+
+ @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+ //TODO extension point for SDF
+ }
+
+ @Override public Offset getStartOffset() {
+ //TODO extension point for SDF
+ return null;
+ }
+
+ @Override public Offset getEndOffset() {
+ //TODO extension point for SDF
+ return null;
+ }
+
+ @Override public Offset deserializeOffset(String json) {
+ //TODO extension point for SDF
+ return null;
+ }
+
+ @Override public void commit(Offset end) {
+ //TODO no more to read after end Offset
+ }
+
+ @Override public void stop() {
+ }
+
+ @Override public StructType readSchema() {
+ return null;
+ }
+
+ @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+ List<InputPartition<InternalRow>> result = new ArrayList<>();
+ long desiredSizeBytes;
+ SparkPipelineOptions options = context.getOptions();
+ try {
+ desiredSizeBytes = (bundleSize == null) ?
+ source.getEstimatedSizeBytes(options) / numPartitions :
+ bundleSize;
+ List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
+ for (BoundedSource<T> source : sources) {
+ result.add(new InputPartition<InternalRow>() {
+
+ @Override public InputPartitionReader<InternalRow> createPartitionReader() {
+ BoundedReader<T> reader = null;
+ try {
+ reader = source.createReader(options);
+ } catch (IOException e) {
+ }
+ return new DatasetMicroBatchPartitionReader(reader);
+ }
+ });
+ }
+ return result;
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ }
+
+ /**
+ * This class can be mapped to Beam {@link BoundedReader}
+ */
+ private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
+
+ BoundedReader<T> reader;
+ private boolean started;
+ private boolean closed;
+
+ DatasetMicroBatchPartitionReader(BoundedReader<T> reader) {
+ this.reader = reader;
+ this.started = false;
+ this.closed = false;
+ }
+
+ @Override public boolean next() throws IOException {
+ if (!started) {
+ started = true;
+ return reader.start();
+ } else {
+ return !closed && reader.advance();
+ }
+ }
+
+ @Override public InternalRow get() {
+ List<Object> list = new ArrayList<>();
+ list.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
+ return InternalRow.apply(asScalaBuffer(list).toList());
+ }
+
+ @Override public void close() throws IOException {
+ closed = true;
+ reader.close();
+ }
+ }
+}