You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:58 UTC

[beam] 36/50: fix mock, wire mock in translators and create a main test.

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8cdc20f7e0a53de18afb70afd31da374dcf6d93e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 20 17:18:54 2018 +0100

    fix mock, wire mock in translators and create a main test.
---
 .../translation/batch/PipelineTranslatorBatch.java  |  2 +-
 .../batch/ReadSourceTranslatorBatch.java            |  3 ++-
 .../batch/ReadSourceTranslatorMockBatch.java        | 21 +++++++--------------
 .../translation/io/DatasetSourceMock.java           |  6 +++---
 .../spark/structuredstreaming/SourceTest.java       | 16 ++++++++++++++++
 5 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 26f1b9c..9ccc712 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -65,7 +65,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
         PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
 
     TRANSFORM_TRANSLATORS.put(
-        PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
+        PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorMockBatch());
   }
 
   public PipelineTranslatorBatch(SparkPipelineOptions options) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 0b828fb..aed016a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -40,6 +40,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalog.Catalog;
 import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
 import org.apache.spark.sql.streaming.DataStreamReader;
 
 class ReadSourceTranslatorBatch<T>
@@ -63,8 +64,8 @@ class ReadSourceTranslatorBatch<T>
       throw new RuntimeException(e);
     }
     SparkSession sparkSession = context.getSparkSession();
-
     DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
+
     Dataset<Row> rowDataset = dataStreamReader.load();
 
     //TODO initialize source : how, to get a reference to the DatasetSource instance that spark
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 5b1bada..504a64d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -17,28 +17,25 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
-import java.io.IOException;
-import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
 import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSourceMock;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.streaming.DataStreamReader;
-import org.apache.spark.sql.types.StructType;
-import scala.reflect.ClassTag;
 
+
+/**
+ * Mock translator that generates a source of 0 to 999 and prints it.
+ * @param <T>
+ */
 class ReadSourceTranslatorMockBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
@@ -48,13 +45,8 @@ class ReadSourceTranslatorMockBatch<T>
   @Override
   public void translateTransform(
       PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
-    AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform =
-        (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
-            context.getCurrentTransform();
-
-        String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$"));
     SparkSession sparkSession = context.getSparkSession();
-    DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
+    DataStreamReader dataStreamReader = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS);
 
     Dataset<Row> rowDataset = dataStreamReader.load();
 
@@ -77,5 +69,6 @@ class ReadSourceTranslatorMockBatch<T>
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
+    dataset.show();
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
index fa42fdf..ec88364 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
@@ -46,7 +46,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
   }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
-  private class DatasetMicroBatchReader implements MicroBatchReader {
+  private static class DatasetMicroBatchReader implements MicroBatchReader {
 
     @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
     }
@@ -70,7 +70,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
     }
 
     @Override public StructType readSchema() {
-      return null;
+      return new StructType();
     }
 
     @Override public List<InputPartition<InternalRow>> planInputPartitions() {
@@ -86,7 +86,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
   }
 
   /** This class is a mocked reader*/
-  private class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> {
+  private static class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> {
 
     private ArrayList<Integer> values;
     private int currentIndex = 0;
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
new file mode 100644
index 0000000..eea9769
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -0,0 +1,16 @@
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+
+public class SourceTest {
+  public static void main(String[] args) {
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1));
+    pipeline.run();
+  }
+
+}