You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/15 16:30:59 UTC
[beam] 03/04: Remove bundleSize parameter and always use spark
default parallelism
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7bb19451dadea0259f6658c7ccc7f157fa0cd576
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 17:06:51 2019 +0100
Remove bundleSize parameter and always use spark default parallelism
---
.../spark/structuredstreaming/SparkPipelineOptions.java | 10 ----------
.../translation/batch/DatasetSourceBatch.java | 5 +----
2 files changed, 1 insertion(+), 14 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
index 2e6653b..442ccf8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
@@ -73,16 +73,6 @@ public interface SparkPipelineOptions
void setCheckpointDurationMillis(Long durationMillis);
- @Description(
- "If set bundleSize will be used for splitting BoundedSources, otherwise default to "
- + "splitting BoundedSources on Spark defaultParallelism. Most effective when used with "
- + "Spark dynamicAllocation.")
- @Default.Long(0)
- Long getBundleSize();
-
- @Experimental
- void setBundleSize(Long value);
-
@Description("Enable/disable sending aggregator values to Spark's metric sinks")
@Default.Boolean(true)
Boolean getEnableSparkMetricSinks();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index d966efb..3f6f219 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -113,10 +113,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
List<InputPartition<InternalRow>> result = new ArrayList<>();
long desiredSizeBytes;
try {
- desiredSizeBytes =
- (sparkPipelineOptions.getBundleSize() == null)
- ? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions
- : sparkPipelineOptions.getBundleSize();
+ desiredSizeBytes = source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions;
List<? extends BoundedSource<T>> splits = source.split(desiredSizeBytes, sparkPipelineOptions);
for (BoundedSource<T> split : splits) {
result.add(