You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/27 15:42:05 UTC

[beam] branch spark-runner_structured-streaming updated (017dcb9 -> 8591d63)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 017dcb9  clean deps
     new 87bec8e  Move DatasetSourceMock to proper batch mode
     new 8591d63  Run pipeline in batch mode or in streaming mode

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/TranslationContext.java            |  7 +++-
 .../batch/ReadSourceTranslatorMockBatch.java       |  3 +-
 .../translation/io/DatasetSourceMock.java          | 41 +++++-----------------
 3 files changed, 16 insertions(+), 35 deletions(-)


[beam] 02/02: Run pipeline in batch mode or in streaming mode

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8591d635023faca33482c26cc0cb0e550aef81bd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 27 16:37:42 2018 +0100

    Run pipeline in batch mode or in streaming mode
---
 .../spark/structuredstreaming/translation/TranslationContext.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index fb36b37..82aa80b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -164,7 +164,12 @@ public class TranslationContext {
     try {
       // to start a pipeline we need a DatastreamWriter to start
       for (Dataset<?> dataset : leaves) {
-        dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
+
+        if (options.isStreaming()) {
+          dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
+        } else {
+          dataset.write();
+        }
       }
     } catch (StreamingQueryException e) {
       throw new RuntimeException("Pipeline execution failed: " + e);


[beam] 01/02: Move DatasetSourceMock to proper batch mode

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 87bec8ea5565e97e333d7f761c1df8734f8ea9f0
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 27 16:33:00 2018 +0100

    Move DatasetSourceMock to proper batch mode
---
 .../batch/ReadSourceTranslatorMockBatch.java       |  3 +-
 .../translation/io/DatasetSourceMock.java          | 41 +++++-----------------
 2 files changed, 10 insertions(+), 34 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 4a509de..184d24c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -46,9 +46,8 @@ class ReadSourceTranslatorMockBatch<T>
   public void translateTransform(
       PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
     SparkSession sparkSession = context.getSparkSession();
-    DataStreamReader dataStreamReader = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS);
 
-    Dataset<Row> rowDataset = dataStreamReader.load();
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
 
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
index ec88364..f722377 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
@@ -22,52 +22,29 @@ import static scala.collection.JavaConversions.asScalaBuffer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
 import org.joda.time.Instant;
 
 /**
  * This is a mock source that gives values between 0 and 999.
  */
-public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
+public class DatasetSourceMock implements DataSourceV2, ReadSupport {
 
-  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
-    return new DatasetMicroBatchReader();
+  @Override public DataSourceReader createReader(DataSourceOptions options) {
+    return new DatasetReader();
   }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
-  private static class DatasetMicroBatchReader implements MicroBatchReader {
-
-    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
-    }
-
-    @Override public Offset getStartOffset() {
-      return null;
-    }
-
-    @Override public Offset getEndOffset() {
-      return null;
-    }
-
-    @Override public Offset deserializeOffset(String json) {
-      return null;
-    }
-
-    @Override public void commit(Offset end) {
-    }
-
-    @Override public void stop() {
-    }
+  private static class DatasetReader implements DataSourceReader {
 
     @Override public StructType readSchema() {
       return new StructType();
@@ -78,7 +55,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
       result.add(new InputPartition<InternalRow>() {
 
         @Override public InputPartitionReader<InternalRow> createPartitionReader() {
-          return new DatasetMicroBatchPartitionReaderMock();
+          return new DatasetPartitionReaderMock();
         }
       });
       return result;
@@ -86,12 +63,12 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
   }
 
   /** This class is a mocked reader*/
-  private static class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> {
+  private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
 
     private ArrayList<Integer> values;
     private int currentIndex = 0;
 
-    private DatasetMicroBatchPartitionReaderMock() {
+    private DatasetPartitionReaderMock() {
       for (int i = 0; i < 1000; i++){
         values.add(i);
       }