You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:39:09 UTC

[beam] 47/50: Refactor DatasetSource fields

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 92c94b123c11eab6e4cfd2441a64463253f2afa2
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Jan 2 16:08:31 2019 +0100

    Refactor DatasetSource fields
---
 .../translation/batch/DatasetSourceBatch.java      | 40 ++++++++++++----------
 1 file changed, 22 insertions(+), 18 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 331e397..e19bbdb 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -49,10 +49,6 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
   static final String BEAM_SOURCE_OPTION = "beam-source";
   static final String DEFAULT_PARALLELISM = "default-parallelism";
   static final String PIPELINE_OPTIONS = "pipeline-options";
-  private int numPartitions;
-  private Long bundleSize;
-  private BoundedSource<T> source;
-  private SparkPipelineOptions sparkPipelineOptions;
 
 
   @SuppressWarnings("unchecked")
@@ -61,31 +57,39 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
     if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
       throw new RuntimeException("Beam source was not set in DataSource options");
     }
-    this.source = Base64Serializer
+    BoundedSource<T> source = Base64Serializer
         .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
 
     if (!options.get(DEFAULT_PARALLELISM).isPresent()){
       throw new RuntimeException("Spark default parallelism was not set in DataSource options");
     }
-    if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
-      throw new RuntimeException("Beam source was not set in DataSource options");
-    }
-    this.numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get());
-    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+    int numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get());
+    checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
+
     if (!options.get(PIPELINE_OPTIONS).isPresent()){
       throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
     }
-    this.sparkPipelineOptions = SerializablePipelineOptions
+    SparkPipelineOptions sparkPipelineOptions = SerializablePipelineOptions
         .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
-    this.bundleSize = sparkPipelineOptions.getBundleSize();
-    return new DatasetReader();  }
+    return new DatasetReader(numPartitions, source, sparkPipelineOptions);
+  }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
   private class DatasetReader implements DataSourceReader {
 
+    private int numPartitions;
+    private BoundedSource<T> source;
+    private SparkPipelineOptions sparkPipelineOptions;
     private Optional<StructType> schema;
     private String checkpointLocation;
 
+    private DatasetReader(int numPartitions, BoundedSource<T> source,
+        SparkPipelineOptions sparkPipelineOptions) {
+      this.numPartitions = numPartitions;
+      this.source = source;
+      this.sparkPipelineOptions = sparkPipelineOptions;
+    }
+
     @Override
     public StructType readSchema() {
       return new StructType();
@@ -97,11 +101,11 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
       long desiredSizeBytes;
       try {
         desiredSizeBytes =
-            (bundleSize == null)
+            (sparkPipelineOptions.getBundleSize() == null)
                 ? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions
-                : bundleSize;
-        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, sparkPipelineOptions);
-        for (BoundedSource<T> source : sources) {
+                : sparkPipelineOptions.getBundleSize();
+        List<? extends BoundedSource<T>> splits = source.split(desiredSizeBytes, sparkPipelineOptions);
+        for (BoundedSource<T> split : splits) {
           result.add(
               new InputPartition<InternalRow>() {
 
@@ -109,7 +113,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
                 public InputPartitionReader<InternalRow> createPartitionReader() {
                   BoundedReader<T> reader = null;
                   try {
-                    reader = source.createReader(sparkPipelineOptions);
+                    reader = split.createReader(sparkPipelineOptions);
                   } catch (IOException e) {
                     throw new RuntimeException(
                         "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);