You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:39:01 UTC
[beam] 39/50: Move DatasetSourceMock to proper batch mode
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 49ee25994ceaf766bca17f5600e72df48583e3f0
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 27 16:33:00 2018 +0100
Move DatasetSourceMock to proper batch mode
---
.../batch/ReadSourceTranslatorMockBatch.java | 3 +-
.../translation/io/DatasetSourceMock.java | 41 +++++-----------------
2 files changed, 10 insertions(+), 34 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 4a509de..184d24c 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -46,9 +46,8 @@ class ReadSourceTranslatorMockBatch<T>
public void translateTransform(
PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
SparkSession sparkSession = context.getSparkSession();
- DataStreamReader dataStreamReader = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS);
- Dataset<Row> rowDataset = dataStreamReader.load();
+ Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
@Override public WindowedValue call(Row value) throws Exception {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
index ec88364..f722377 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
@@ -22,52 +22,29 @@ import static scala.collection.JavaConversions.asScalaBuffer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
-import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
import org.joda.time.Instant;
/**
* This is a mock source that gives values between 0 and 999.
*/
-public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
+public class DatasetSourceMock implements DataSourceV2, ReadSupport {
- @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
- return new DatasetMicroBatchReader();
+ @Override public DataSourceReader createReader(DataSourceOptions options) {
+ return new DatasetReader();
}
/** This class can be mapped to Beam {@link BoundedSource}. */
- private static class DatasetMicroBatchReader implements MicroBatchReader {
-
- @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
- }
-
- @Override public Offset getStartOffset() {
- return null;
- }
-
- @Override public Offset getEndOffset() {
- return null;
- }
-
- @Override public Offset deserializeOffset(String json) {
- return null;
- }
-
- @Override public void commit(Offset end) {
- }
-
- @Override public void stop() {
- }
+ private static class DatasetReader implements DataSourceReader {
@Override public StructType readSchema() {
return new StructType();
@@ -78,7 +55,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
result.add(new InputPartition<InternalRow>() {
@Override public InputPartitionReader<InternalRow> createPartitionReader() {
- return new DatasetMicroBatchPartitionReaderMock();
+ return new DatasetPartitionReaderMock();
}
});
return result;
@@ -86,12 +63,12 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport {
}
/** This class is a mocked reader*/
- private static class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> {
+ private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
private ArrayList<Integer> values;
private int currentIndex = 0;
- private DatasetMicroBatchPartitionReaderMock() {
+ private DatasetPartitionReaderMock() {
for (int i = 0; i < 1000; i++){
values.add(i);
}