You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 10:25:21 UTC

[beam] 07/08: Simplify beam reader creation as it created once the source as already been partitioned

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9fad3d438d13326e88f74a08d8370eeed8288935
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 11:23:30 2019 +0100

    Simplify beam reader creation as it created once the source as already been partitioned
---
 .../translation/batch/DatasetSourceBatch.java       | 21 +++++++++------------
 1 file changed, 9 insertions(+), 12 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index c35f62e..d9e1722 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static scala.collection.JavaConversions.asScalaBuffer;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
@@ -63,7 +64,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
   }
 
   /** This class is mapped to Beam {@link BoundedSource}. */
-  private static class DatasetReader<T> implements DataSourceReader {
+  private static class DatasetReader<T> implements DataSourceReader, Serializable {
 
     private int numPartitions;
     private BoundedSource<T> source;
@@ -135,26 +136,22 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
     private boolean started;
     private boolean closed;
     private BoundedReader<T> reader;
-    private BoundedSource<T> source;
-    private SerializablePipelineOptions serializablePipelineOptions;
 
     DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
-      this.source = source;
-      this.serializablePipelineOptions = serializablePipelineOptions;
       this.started = false;
       this.closed = false;
+      // reader is not serializable so lazy initialize it
+      try {
+        reader = source
+            .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
+      } catch (IOException e) {
+        throw new RuntimeException("Error creating BoundedReader ", e);
+      }
     }
 
     @Override
     public boolean next() throws IOException {
       if (!started) {
-        // reader is not serializable so lazy initialize it
-        try {
-          reader = source
-              .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
-        } catch (IOException e) {
-          throw new RuntimeException("Error creating BoundedReader ", e);
-        }
         started = true;
         return reader.start();
       } else {