You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/08 10:58:27 UTC
[beam] branch spark-runner_structured-streaming updated (852058d ->
ff79b8f)
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.
from 852058d Fix checkstyle
new 57edb20 Add a dummy schema for reader
new 0ed1c60 Add empty 0-arg constructor for mock source
new ff79b8f Clean
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../translation/batch/DatasetSourceBatch.java | 9 ++++++++-
.../translation/batch/ReadSourceTranslatorBatch.java | 1 -
.../translation/batch/mocks/DatasetSourceMockBatch.java | 3 +++
3 files changed, 11 insertions(+), 2 deletions(-)
[beam] 01/03: Add a dummy schema for reader
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 57edb20619dc223a2b528d2fe7ac94f3df2d957d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 8 10:27:28 2019 +0100
Add a dummy schema for reader
---
.../translation/batch/DatasetSourceBatch.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 1c8a888..c1a93e3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -38,6 +38,9 @@ import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
@@ -94,7 +97,11 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
@Override
public StructType readSchema() {
- return new StructType();
+ StructField[] array = new StructField[1];
+ StructField dummyStructField = StructField
+ .apply("dummyStructField", DataTypes.NullType, true, Metadata.empty());
+ array[0] = dummyStructField;
+ return new StructType(array);
}
@Override
[beam] 03/03: Clean
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit ff79b8f9b138a2620bc19740c72c1cf54c640be6
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 8 11:07:10 2019 +0100
Clean
---
.../structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 68a9de9..48d1646 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -68,7 +68,6 @@ class ReadSourceTranslatorBatch<T>
Dataset<Row> rowDataset = sparkSession.read().format(SOURCEPROVIDERCLASS).options(datasetSourceOptions)
.load();
- //TODO pass the source and the translation context serialized as string to the DatasetSource
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
@Override public WindowedValue call(Row value) throws Exception {
//there is only one value put in each Row by the InputPartitionReader
[beam] 02/03: Add empty 0-arg constructor for mock source
Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0ed1c60dda511e0d5dbb3a8566f1e2c3e5b346e2
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 8 10:27:57 2019 +0100
Add empty 0-arg constructor for mock source
---
.../translation/batch/mocks/DatasetSourceMockBatch.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
index 5a93928..81aead2 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
@@ -39,6 +39,9 @@ import org.joda.time.Instant;
*/
public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
+ private DatasetSourceMockBatch() {
+ }
+
@Override public DataSourceReader createReader(DataSourceOptions options) {
return new DatasetReader();
}