You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 10:25:15 UTC

[beam] 01/08: Fix serialization issues

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit bab9027f9fd6d7bf38134518b98d942fc75ec16a
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 17:22:13 2019 +0100

    Fix serialization issues
---
 .../translation/batch/DatasetSourceBatch.java      | 83 +++++++++++-----------
 1 file changed, 42 insertions(+), 41 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 421a3f9..c35f62e 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -23,8 +23,8 @@ import static scala.collection.JavaConversions.asScalaBuffer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType;
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
-public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
+public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
 
   static final String BEAM_SOURCE_OPTION = "beam-source";
   static final String DEFAULT_PARALLELISM = "default-parallelism";
@@ -59,38 +59,35 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
   @SuppressWarnings("unchecked")
   @Override
   public DataSourceReader createReader(DataSourceOptions options) {
-    if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
-      throw new RuntimeException("Beam source was not set in DataSource options");
-    }
-    BoundedSource<T> source = Base64Serializer
-        .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
-
-    if (!options.get(DEFAULT_PARALLELISM).isPresent()){
-      throw new RuntimeException("Spark default parallelism was not set in DataSource options");
-    }
-    int numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
-    checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
-
-    if (!options.get(PIPELINE_OPTIONS).isPresent()){
-      throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
-    }
-    SparkPipelineOptions sparkPipelineOptions = PipelineOptionsSerializationUtils
-        .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
-    return new DatasetReader(numPartitions, source, sparkPipelineOptions);
+    return new DatasetReader<>(options);
   }
 
-  /** This class can be mapped to Beam {@link BoundedSource}. */
-  private class DatasetReader implements DataSourceReader {
+  /** This class is mapped to Beam {@link BoundedSource}. */
+  private static class DatasetReader<T> implements DataSourceReader {
 
     private int numPartitions;
     private BoundedSource<T> source;
-    private SparkPipelineOptions sparkPipelineOptions;
+    private SerializablePipelineOptions serializablePipelineOptions;
 
-    private DatasetReader(int numPartitions, BoundedSource<T> source,
-        SparkPipelineOptions sparkPipelineOptions) {
-      this.numPartitions = numPartitions;
-      this.source = source;
-      this.sparkPipelineOptions = sparkPipelineOptions;
+    private DatasetReader(DataSourceOptions options) {
+      if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
+        throw new RuntimeException("Beam source was not set in DataSource options");
+      }
+      this.source = Base64Serializer
+          .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
+
+      if (!options.get(DEFAULT_PARALLELISM).isPresent()){
+        throw new RuntimeException("Spark default parallelism was not set in DataSource options");
+      }
+      this.numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
+      checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
+
+      if (!options.get(PIPELINE_OPTIONS).isPresent()){
+        throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
+      }
+      SparkPipelineOptions sparkPipelineOptions = PipelineOptionsSerializationUtils
+          .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
+      this.serializablePipelineOptions = new SerializablePipelineOptions(sparkPipelineOptions);
     }
 
     @Override
@@ -104,6 +101,8 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
 
     @Override
     public List<InputPartition<InternalRow>> planInputPartitions() {
+      SparkPipelineOptions sparkPipelineOptions = serializablePipelineOptions.get()
+          .as(SparkPipelineOptions.class);
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
       try {
@@ -118,14 +117,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
 
                 @Override
                 public InputPartitionReader<InternalRow> createPartitionReader() {
-                  BoundedReader<T> reader = null;
-                  try {
-                    reader = split.createReader(sparkPipelineOptions);
-                  } catch (IOException e) {
-                    throw new RuntimeException(
-                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
-                  }
-                  return new DatasetPartitionReader(reader);
+                  return new DatasetPartitionReader<>(source, serializablePipelineOptions);
                 }
               });
         }
@@ -139,14 +131,16 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
   }
 
   /** This class can be mapped to Beam {@link BoundedReader}. */
-  private class DatasetPartitionReader implements InputPartitionReader<InternalRow> {
-
-    BoundedReader<T> reader;
+  private static class DatasetPartitionReader<T> implements InputPartitionReader<InternalRow> {
     private boolean started;
     private boolean closed;
+    private BoundedReader<T> reader;
+    private BoundedSource<T> source;
+    private SerializablePipelineOptions serializablePipelineOptions;
 
-    DatasetPartitionReader(BoundedReader<T> reader) {
-      this.reader = reader;
+    DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
+      this.source = source;
+      this.serializablePipelineOptions = serializablePipelineOptions;
       this.started = false;
       this.closed = false;
     }
@@ -154,6 +148,13 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
     @Override
     public boolean next() throws IOException {
       if (!started) {
+        // reader is not serializable so lazy initialize it
+        try {
+          reader = source
+              .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
+        } catch (IOException e) {
+          throw new RuntimeException("Error creating BoundedReader ", e);
+        }
         started = true;
         return reader.start();
       } else {