You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/11 14:22:13 UTC

[beam] branch spark-runner_structured-streaming updated (b2d37bf -> e86247f)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from b2d37bf  Use Iterators.transform() to return Iterable
     new 3e87c5e  Implement read transform
     new f54899b  update TODO
     new 8e08c58  Apply spotless
     new e86247f  start source instanciation

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/TranslationContext.java            |  12 ++
 .../batch/ReadSourceTranslatorBatch.java           |  45 ++++-
 .../translation/io/DatasetSource.java              | 192 +++++++++++++++++++++
 3 files changed, 248 insertions(+), 1 deletion(-)
 create mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java


[beam] 01/04: Implement read transform

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3e87c5eb28cde60cb374505f6d1b09efd54491d8
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 3 09:28:11 2018 +0100

    Implement read transform
---
 .../translation/TranslationContext.java            |  19 +++
 .../batch/ReadSourceTranslatorBatch.java           |  32 +++-
 .../translation/io/DatasetSource.java              | 163 +++++++++++++++++++++
 3 files changed, 213 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index e66bc90..52ed11f 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,11 +33,18 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.streaming.Source;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.streaming.StreamingQueryException;
 
 /**
@@ -73,6 +80,14 @@ public class TranslationContext {
     this.leaves = new HashSet<>();
   }
 
+  public SparkSession getSparkSession() {
+    return sparkSession;
+  }
+
+  public SparkPipelineOptions getOptions() {
+    return options;
+  }
+
   // --------------------------------------------------------------------------------------------
   //  Transforms methods
   // --------------------------------------------------------------------------------------------
@@ -80,6 +95,10 @@ public class TranslationContext {
     this.currentTransform = currentTransform;
   }
 
+  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return currentTransform;
+  }
+
   // --------------------------------------------------------------------------------------------
   //  Datasets methods
   // --------------------------------------------------------------------------------------------
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index d18eb2e..05dc374 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -17,16 +17,46 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.io.IOException;
+import org.apache.beam.runners.core.construction.ReadTranslation;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
 
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
+  @SuppressWarnings("unchecked")
   @Override
   public void translateTransform(
-      PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {}
+      PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
+    AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform =
+        (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
+            context.getCurrentTransform();
+    BoundedSource<T> source;
+    try {
+      source = ReadTranslation.boundedSourceFromTransform(rootTransform);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+
+    SparkSession sparkSession = context.getSparkSession();
+    DatasetSource datasetSource = new DatasetSource(context, source);
+    Dataset<Row> dataset = sparkSession.readStream().format("DatasetSource").load();
+
+    context.putDataset(output, dataset);
+  }
+
+
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
new file mode 100644
index 0000000..d9d283e
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -0,0 +1,163 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static scala.collection.JavaConversions.asScalaBuffer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
+ * This class is just a mix-in.
+ */
+public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
+
+  private final int numPartitions;
+  private final Long bundleSize;
+  private TranslationContext context;
+  private BoundedSource<T> source;
+
+  public DatasetSource(TranslationContext context, BoundedSource<T> source) {
+    this.context = context;
+    this.source = source;
+    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+    this.bundleSize = context.getOptions().getBundleSize();
+
+  }
+
+  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
+      String checkpointLocation, DataSourceOptions options) {
+    return new DatasetMicroBatchReader(schema, checkpointLocation, options);
+  }
+
+  /**
+   * This class can be mapped to Beam {@link BoundedSource}.
+   */
+  private class DatasetMicroBatchReader implements MicroBatchReader {
+
+    private Optional<StructType> schema;
+    private String checkpointLocation;
+    private DataSourceOptions options;
+
+    private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
+        DataSourceOptions options) {
+      //TODO start reading from the source here, inc offset at each element read
+    }
+
+    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+      //TODO extension point for SDF
+    }
+
+    @Override public Offset getStartOffset() {
+      //TODO extension point for SDF
+      return null;
+    }
+
+    @Override public Offset getEndOffset() {
+      //TODO extension point for SDF
+      return null;
+    }
+
+    @Override public Offset deserializeOffset(String json) {
+      //TODO extension point for SDF
+      return null;
+    }
+
+    @Override public void commit(Offset end) {
+      //TODO no more to read after end Offset
+    }
+
+    @Override public void stop() {
+    }
+
+    @Override public StructType readSchema() {
+      return null;
+    }
+
+    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+      List<InputPartition<InternalRow>> result = new ArrayList<>();
+      long desiredSizeBytes;
+      SparkPipelineOptions options = context.getOptions();
+      try {
+        desiredSizeBytes = (bundleSize == null) ?
+            source.getEstimatedSizeBytes(options) / numPartitions :
+            bundleSize;
+        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
+        for (BoundedSource<T> source : sources) {
+          result.add(new InputPartition<InternalRow>() {
+
+            @Override public InputPartitionReader<InternalRow> createPartitionReader() {
+              BoundedReader<T> reader = null;
+              try {
+                reader = source.createReader(options);
+              } catch (IOException e) {
+              }
+              return new DatasetMicroBatchPartitionReader(reader);
+            }
+          });
+        }
+        return result;
+
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      return result;
+    }
+
+  }
+
+  /**
+   * This class can be mapped to Beam {@link BoundedReader}
+   */
+  private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
+
+    BoundedReader<T> reader;
+    private boolean started;
+    private boolean closed;
+
+    DatasetMicroBatchPartitionReader(BoundedReader<T> reader) {
+      this.reader = reader;
+      this.started = false;
+      this.closed = false;
+    }
+
+    @Override public boolean next() throws IOException {
+      if (!started) {
+        started = true;
+        return reader.start();
+      } else {
+        return !closed && reader.advance();
+      }
+    }
+
+    @Override public InternalRow get() {
+      List<Object> list = new ArrayList<>();
+      list.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
+      return InternalRow.apply(asScalaBuffer(list).toList());
+    }
+
+    @Override public void close() throws IOException {
+      closed = true;
+      reader.close();
+    }
+  }
+}


[beam] 02/04: update TODO

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f54899ba3a9e41e9c1719972577ecf878285c374
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 6 17:28:57 2018 +0100

    update TODO
---
 .../runners/spark/structuredstreaming/translation/io/DatasetSource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index d9d283e..60bdab6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -60,7 +60,7 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
 
     private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
         DataSourceOptions options) {
-      //TODO start reading from the source here, inc offset at each element read
+      //TODO deal with schema and options
     }
 
     @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {


[beam] 03/04: Apply spotless

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8e08c5856adf16e45dbd51cb2284593439d54714
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 7 12:08:51 2018 +0100

    Apply spotless
---
 .../translation/TranslationContext.java            |   7 --
 .../batch/ReadSourceTranslatorBatch.java           |   4 -
 .../translation/io/DatasetSource.java              | 109 +++++++++++++--------
 3 files changed, 68 insertions(+), 52 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 52ed11f..0f2493d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -33,18 +33,11 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.ForeachWriter;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.execution.datasources.DataSource;
-import org.apache.spark.sql.execution.streaming.Source;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.streaming.StreamingQueryException;
 
 /**
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 05dc374..63f2fdf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -19,14 +19,12 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.sql.Dataset;
@@ -57,6 +55,4 @@ class ReadSourceTranslatorBatch<T>
 
     context.putDataset(output, dataset);
   }
-
-
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index 60bdab6..f230a70 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark.structuredstreaming.translation.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -25,8 +42,8 @@ import org.apache.spark.sql.types.StructType;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
- * This class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
+ * class is just a mix-in.
  */
 public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
 
@@ -41,79 +58,87 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
     this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
     checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
     this.bundleSize = context.getOptions().getBundleSize();
-
   }
 
-  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
-      String checkpointLocation, DataSourceOptions options) {
+  @Override
+  public MicroBatchReader createMicroBatchReader(
+      Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
     return new DatasetMicroBatchReader(schema, checkpointLocation, options);
   }
 
-  /**
-   * This class can be mapped to Beam {@link BoundedSource}.
-   */
+  /** This class can be mapped to Beam {@link BoundedSource}. */
   private class DatasetMicroBatchReader implements MicroBatchReader {
 
     private Optional<StructType> schema;
     private String checkpointLocation;
     private DataSourceOptions options;
 
-    private DatasetMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
-        DataSourceOptions options) {
+    private DatasetMicroBatchReader(
+        Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
       //TODO deal with schema and options
     }
 
-    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    @Override
+    public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
       //TODO extension point for SDF
     }
 
-    @Override public Offset getStartOffset() {
+    @Override
+    public Offset getStartOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override public Offset getEndOffset() {
+    @Override
+    public Offset getEndOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override public Offset deserializeOffset(String json) {
+    @Override
+    public Offset deserializeOffset(String json) {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override public void commit(Offset end) {
+    @Override
+    public void commit(Offset end) {
       //TODO no more to read after end Offset
     }
 
-    @Override public void stop() {
-    }
+    @Override
+    public void stop() {}
 
-    @Override public StructType readSchema() {
+    @Override
+    public StructType readSchema() {
       return null;
     }
 
-    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
       SparkPipelineOptions options = context.getOptions();
       try {
-        desiredSizeBytes = (bundleSize == null) ?
-            source.getEstimatedSizeBytes(options) / numPartitions :
-            bundleSize;
+        desiredSizeBytes =
+            (bundleSize == null)
+                ? source.getEstimatedSizeBytes(options) / numPartitions
+                : bundleSize;
         List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
         for (BoundedSource<T> source : sources) {
-          result.add(new InputPartition<InternalRow>() {
-
-            @Override public InputPartitionReader<InternalRow> createPartitionReader() {
-              BoundedReader<T> reader = null;
-              try {
-                reader = source.createReader(options);
-              } catch (IOException e) {
-              }
-              return new DatasetMicroBatchPartitionReader(reader);
-            }
-          });
+          result.add(
+              new InputPartition<InternalRow>() {
+
+                @Override
+                public InputPartitionReader<InternalRow> createPartitionReader() {
+                  BoundedReader<T> reader = null;
+                  try {
+                    reader = source.createReader(options);
+                  } catch (IOException e) {
+                  }
+                  return new DatasetMicroBatchPartitionReader(reader);
+                }
+              });
         }
         return result;
 
@@ -122,12 +147,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
       }
       return result;
     }
-
   }
 
-  /**
-   * This class can be mapped to Beam {@link BoundedReader}
-   */
+  /** This class can be mapped to Beam {@link BoundedReader} */
   private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
 
     BoundedReader<T> reader;
@@ -140,7 +162,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
       this.closed = false;
     }
 
-    @Override public boolean next() throws IOException {
+    @Override
+    public boolean next() throws IOException {
       if (!started) {
         started = true;
         return reader.start();
@@ -149,13 +172,17 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
       }
     }
 
-    @Override public InternalRow get() {
+    @Override
+    public InternalRow get() {
       List<Object> list = new ArrayList<>();
-      list.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
+      list.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              reader.getCurrent(), reader.getCurrentTimestamp()));
       return InternalRow.apply(asScalaBuffer(list).toList());
     }
 
-    @Override public void close() throws IOException {
+    @Override
+    public void close() throws IOException {
       closed = true;
       reader.close();
     }


[beam] 04/04: start source instanciation

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e86247f88aafd9d32abef9eb4b897d393317d264
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Dec 10 15:27:49 2018 +0100

    start source instanciation
---
 .../batch/ReadSourceTranslatorBatch.java           | 27 ++++++++++++++++++----
 .../translation/io/DatasetSource.java              | 10 ++++----
 2 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 63f2fdf..a75730a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -22,18 +22,25 @@ import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamReader;
 
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
+  private String SOURCE_PROVIDER_CLASS = DatasetSource.class.getCanonicalName();
+
   @SuppressWarnings("unchecked")
   @Override
   public void translateTransform(
@@ -41,18 +48,28 @@ class ReadSourceTranslatorBatch<T>
     AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform =
         (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
             context.getCurrentTransform();
-    BoundedSource<T> source;
+
+        String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$"));
+        BoundedSource<T> source;
     try {
       source = ReadTranslation.boundedSourceFromTransform(rootTransform);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    PCollection<T> output = (PCollection<T>) context.getOutput();
-
     SparkSession sparkSession = context.getSparkSession();
-    DatasetSource datasetSource = new DatasetSource(context, source);
-    Dataset<Row> dataset = sparkSession.readStream().format("DatasetSource").load();
+    Dataset<Row> rowDataset = sparkSession.readStream().format(providerClassName).load();
+    //TODO initialize source : how, to get a reference to the DatasetSource instance that spark
+    // instantiates to be able to call DatasetSource.initialize()
+    MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
+      @Override public WindowedValue<T> call(Row value) throws Exception {
+        //TODO fix row content extraction: I guess cast is not enough
+        return (WindowedValue<T>) value.get(0);
+      }
+    };
+    //TODO fix encoder
+    Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null);
 
+    PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index f230a70..75cdd5d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -45,14 +46,15 @@ import org.apache.spark.sql.types.StructType;
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
  * class is just a mix-in.
  */
-public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport {
+public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
 
-  private final int numPartitions;
-  private final Long bundleSize;
+  private int numPartitions;
+  private Long bundleSize;
   private TranslationContext context;
   private BoundedSource<T> source;
 
-  public DatasetSource(TranslationContext context, BoundedSource<T> source) {
+
+  public void initialize(TranslationContext context, BoundedSource<T> source){
     this.context = context;
     this.source = source;
     this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();