You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:39:01 UTC

[beam] 39/50: Move DatasetSourceMock to proper batch mode

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 49ee25994ceaf766bca17f5600e72df48583e3f0
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 27 16:33:00 2018 +0100

    Move DatasetSourceMock to proper batch mode
---
 .../batch/ReadSourceTranslatorMockBatch.java       |  3 +-
 .../translation/io/DatasetSourceMock.java          | 41 +++++-----------------
 2 files changed, 10 insertions(+), 34 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 4a509de..184d24c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -46,9 +46,8 @@ class ReadSourceTranslatorMockBatch<T>
   public void translateTransform(
       PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
     SparkSession sparkSession = context.getSparkSession();
-    DataStreamReader dataStreamReader = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS);
 
-    Dataset<Row> rowDataset = dataStreamReader.load();
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
index ec88364..f722377 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
@@ -22,52 +22,29 @@ import static scala.collection.JavaConversions.asScalaBuffer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
 import org.joda.time.Instant;
 
 /**
  * This is a mock source that gives values between 0 and 999.
  */
-public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
+public class DatasetSourceMock implements DataSourceV2, ReadSupport {
 
-  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
-    return new DatasetMicroBatchReader();
+  @Override public DataSourceReader createReader(DataSourceOptions options) {
+    return new DatasetReader();
   }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
-  private static class DatasetMicroBatchReader implements MicroBatchReader {
-
-    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
-    }
-
-    @Override public Offset getStartOffset() {
-      return null;
-    }
-
-    @Override public Offset getEndOffset() {
-      return null;
-    }
-
-    @Override public Offset deserializeOffset(String json) {
-      return null;
-    }
-
-    @Override public void commit(Offset end) {
-    }
-
-    @Override public void stop() {
-    }
+  private static class DatasetReader implements DataSourceReader {
 
     @Override public StructType readSchema() {
       return new StructType();
@@ -78,7 +55,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
       result.add(new InputPartition<InternalRow>() {
 
         @Override public InputPartitionReader<InternalRow> createPartitionReader() {
-          return new DatasetMicroBatchPartitionReaderMock();
+          return new DatasetPartitionReaderMock();
         }
       });
       return result;
@@ -86,12 +63,12 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
   }
 
   /** This class is a mocked reader*/
-  private static class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> {
+  private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
 
     private ArrayList<Integer> values;
     private int currentIndex = 0;
 
-    private DatasetMicroBatchPartitionReaderMock() {
+    private DatasetPartitionReaderMock() {
       for (int i = 0; i < 1000; i++){
         values.add(i);
       }