You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:58 UTC
[beam] 36/50: fix mock,
wire mock in translators and create a main test.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8cdc20f7e0a53de18afb70afd31da374dcf6d93e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 20 17:18:54 2018 +0100
fix mock, wire mock in translators and create a main test.
---
.../translation/batch/PipelineTranslatorBatch.java | 2 +-
.../batch/ReadSourceTranslatorBatch.java | 3 ++-
.../batch/ReadSourceTranslatorMockBatch.java | 21 +++++++--------------
.../translation/io/DatasetSourceMock.java | 6 +++---
.../spark/structuredstreaming/SourceTest.java | 16 ++++++++++++++++
5 files changed, 29 insertions(+), 19 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 26f1b9c..9ccc712 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -65,7 +65,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
TRANSFORM_TRANSLATORS.put(
- PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
+ PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorMockBatch());
}
public PipelineTranslatorBatch(SparkPipelineOptions options) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 0b828fb..aed016a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -40,6 +40,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Catalog;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.streaming.DataStreamReader;
class ReadSourceTranslatorBatch<T>
@@ -63,8 +64,8 @@ class ReadSourceTranslatorBatch<T>
throw new RuntimeException(e);
}
SparkSession sparkSession = context.getSparkSession();
-
DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
+
Dataset<Row> rowDataset = dataStreamReader.load();
//TODO initialize source : how, to get a reference to the DatasetSource instance that spark
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 5b1bada..504a64d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -17,28 +17,25 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-import java.io.IOException;
-import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSourceMock;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
-import org.apache.spark.sql.types.StructType;
-import scala.reflect.ClassTag;
+
+/**
+ * Mock translator that generates a source of 0 to 999 and prints it.
+ * @param <T>
+ */
class ReadSourceTranslatorMockBatch<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
@@ -48,13 +45,8 @@ class ReadSourceTranslatorMockBatch<T>
@Override
public void translateTransform(
PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
- AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform =
- (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
- context.getCurrentTransform();
-
- String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$"));
SparkSession sparkSession = context.getSparkSession();
- DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
+ DataStreamReader dataStreamReader = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS);
Dataset<Row> rowDataset = dataStreamReader.load();
@@ -77,5 +69,6 @@ class ReadSourceTranslatorMockBatch<T>
PCollection<T> output = (PCollection<T>) context.getOutput();
context.putDataset(output, dataset);
+ dataset.show();
}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
index fa42fdf..ec88364 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
@@ -46,7 +46,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
}
/** This class can be mapped to Beam {@link BoundedSource}. */
- private class DatasetMicroBatchReader implements MicroBatchReader {
+ private static class DatasetMicroBatchReader implements MicroBatchReader {
@Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
}
@@ -70,7 +70,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
}
@Override public StructType readSchema() {
- return null;
+ return new StructType();
}
@Override public List<InputPartition<InternalRow>> planInputPartitions() {
@@ -86,7 +86,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
}
/** This class is a mocked reader*/
- private class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> {
+ private static class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> {
private ArrayList<Integer> values;
private int currentIndex = 0;
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
new file mode 100644
index 0000000..eea9769
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -0,0 +1,16 @@
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+
+public class SourceTest {
+ public static void main(String[] args) {
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(Create.of(1));
+ pipeline.run();
+ }
+
+}