You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/15 16:30:59 UTC

[beam] 03/04: Remove bundleSize parameter and always use spark default parallelism

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7bb19451dadea0259f6658c7ccc7f157fa0cd576
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 17:06:51 2019 +0100

    Remove bundleSize parameter and always use spark default parallelism
---
 .../spark/structuredstreaming/SparkPipelineOptions.java        | 10 ----------
 .../translation/batch/DatasetSourceBatch.java                  |  5 +----
 2 files changed, 1 insertion(+), 14 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
index 2e6653b..442ccf8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
@@ -73,16 +73,6 @@ public interface SparkPipelineOptions
 
   void setCheckpointDurationMillis(Long durationMillis);
 
-  @Description(
-      "If set bundleSize will be used for splitting BoundedSources, otherwise default to "
-          + "splitting BoundedSources on Spark defaultParallelism. Most effective when used with "
-          + "Spark dynamicAllocation.")
-  @Default.Long(0)
-  Long getBundleSize();
-
-  @Experimental
-  void setBundleSize(Long value);
-
   @Description("Enable/disable sending aggregator values to Spark's metric sinks")
   @Default.Boolean(true)
   Boolean getEnableSparkMetricSinks();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index d966efb..3f6f219 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -113,10 +113,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
       try {
-        desiredSizeBytes =
-            (sparkPipelineOptions.getBundleSize() == null)
-                ? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions
-                : sparkPipelineOptions.getBundleSize();
+        desiredSizeBytes = source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions;
         List<? extends BoundedSource<T>> splits = source.split(desiredSizeBytes, sparkPipelineOptions);
         for (BoundedSource<T> split : splits) {
           result.add(