You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/15 12:40:31 UTC
[beam] 01/03: Serialize windowedValue to byte[] in source to be
able to specify a binary dataset schema and deserialize windowedValue from
Row to get a dataset
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7ff0a262f2ae4c57ab5e7f5e213ab17317f70a69
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 13:24:09 2019 +0100
Serialize windowedValue to byte[] in source to be able to specify a binary dataset schema and deserialize windowedValue from Row to get a dataset<WindowedValue>
---
.../translation/batch/DatasetSourceBatch.java | 29 ++++++++++++++++------
.../batch/ReadSourceTranslatorBatch.java | 9 ++++++-
2 files changed, 30 insertions(+), 8 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index d9e1722..c4cfeaf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import static com.google.common.base.Preconditions.checkArgument;
import static scala.collection.JavaConversions.asScalaBuffer;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -30,6 +31,7 @@ import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
@@ -93,10 +95,11 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
@Override
public StructType readSchema() {
+ // TODO: find a way to extend schema with a WindowedValue schema
StructField[] array = new StructField[1];
- StructField dummyStructField = StructField
- .apply("dummyStructField", DataTypes.NullType, true, Metadata.empty());
- array[0] = dummyStructField;
+ StructField binaryStructField = StructField
+ .apply("binaryStructField", DataTypes.BinaryType, true, Metadata.empty());
+ array[0] = binaryStructField;
return new StructType(array);
}
@@ -135,11 +138,13 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
private static class DatasetPartitionReader<T> implements InputPartitionReader<InternalRow> {
private boolean started;
private boolean closed;
+ private BoundedSource<T> source;
private BoundedReader<T> reader;
DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
this.started = false;
this.closed = false;
+ this.source = source;
// reader is not serializable so lazy initialize it
try {
reader = source
@@ -162,10 +167,20 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
@Override
public InternalRow get() {
List<Object> list = new ArrayList<>();
- list.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
- return InternalRow.apply(asScalaBuffer(list).toList());
+ WindowedValue<T> windowedValue = WindowedValue
+ .timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp());
+ //serialize the windowedValue to bytes array to comply with dataset binary schema
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+ .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ try {
+ windowedValueCoder.encode(windowedValue, byteArrayOutputStream);
+ byte[] bytes = byteArrayOutputStream.toByteArray();
+ list.add(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+return InternalRow.apply(asScalaBuffer(list).toList());
}
@Override
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 8810e21..fec0fd3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
import org.apache.beam.runners.core.construction.ReadTranslation;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -64,10 +66,15 @@ class ReadSourceTranslatorBatch<T>
.option(DatasetSourceBatch.PIPELINE_OPTIONS,
PipelineOptionsSerializationUtils.serializeToJson(context.getOptions())).load();
+ // extract windowedValue from Row
MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
@Override public WindowedValue call(Row value) throws Exception {
//there is only one value put in each Row by the InputPartitionReader
- return value.<WindowedValue>getAs(0);
+ byte[] bytes = (byte[]) value.get(0);
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+ .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+ WindowedValue<T> windowedValue = windowedValueCoder.decode(new ByteArrayInputStream(bytes));
+ return windowedValue;
}
};
//TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedValue<T>>