You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/15 12:40:31 UTC

[beam] 01/03: Serialize windowedValue to byte[] in source to be able to specify a binary dataset schema and deserialize windowedValue from Row to get a dataset

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7ff0a262f2ae4c57ab5e7f5e213ab17317f70a69
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 13:24:09 2019 +0100

    Serialize windowedValue to byte[] in source to be able to specify a binary dataset schema and deserialize windowedValue from Row to get a dataset<WindowedValue>
---
 .../translation/batch/DatasetSourceBatch.java      | 29 ++++++++++++++++------
 .../batch/ReadSourceTranslatorBatch.java           |  9 ++++++-
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index d9e1722..c4cfeaf 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import static com.google.common.base.Preconditions.checkArgument;
 import static scala.collection.JavaConversions.asScalaBuffer;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
@@ -93,10 +95,11 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
 
     @Override
     public StructType readSchema() {
+      // TODO: find a way to extend schema with a WindowedValue schema
       StructField[] array = new StructField[1];
-      StructField dummyStructField = StructField
-          .apply("dummyStructField", DataTypes.NullType, true, Metadata.empty());
-      array[0] = dummyStructField;
+      StructField binaryStructField = StructField
+          .apply("binaryStructField", DataTypes.BinaryType, true, Metadata.empty());
+      array[0] = binaryStructField;
       return new StructType(array);
     }
 
@@ -135,11 +138,13 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
   private static class DatasetPartitionReader<T> implements InputPartitionReader<InternalRow> {
     private boolean started;
     private boolean closed;
+    private BoundedSource<T> source;
     private BoundedReader<T> reader;
 
     DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
       this.started = false;
       this.closed = false;
+      this.source = source;
       // reader is not serializable so lazy initialize it
       try {
         reader = source
@@ -162,10 +167,20 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
     @Override
     public InternalRow get() {
       List<Object> list = new ArrayList<>();
-      list.add(
-          WindowedValue.timestampedValueInGlobalWindow(
-              reader.getCurrent(), reader.getCurrentTimestamp()));
-      return InternalRow.apply(asScalaBuffer(list).toList());
+      WindowedValue<T> windowedValue = WindowedValue
+          .timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp());
+      //serialize the windowedValue to bytes array to comply with dataset binary schema
+      WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+          .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+      ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+      try {
+        windowedValueCoder.encode(windowedValue, byteArrayOutputStream);
+        byte[] bytes = byteArrayOutputStream.toByteArray();
+        list.add(bytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+return InternalRow.apply(asScalaBuffer(list).toList());
     }
 
     @Override
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 8810e21..fec0fd3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
 import org.apache.beam.runners.core.construction.ReadTranslation;
@@ -26,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -64,10 +66,15 @@ class ReadSourceTranslatorBatch<T>
         .option(DatasetSourceBatch.PIPELINE_OPTIONS,
             PipelineOptionsSerializationUtils.serializeToJson(context.getOptions())).load();
 
+    // extract windowedValue from Row
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue>getAs(0);
+        byte[] bytes = (byte[]) value.get(0);
+        WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder
+            .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+        WindowedValue<T> windowedValue = windowedValueCoder.decode(new ByteArrayInputStream(bytes));
+        return windowedValue;
       }
     };
     //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedValue<T>>