You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:39:09 UTC
[beam] 47/50: Refactor DatasetSource fields
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 92c94b123c11eab6e4cfd2441a64463253f2afa2
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Jan 2 16:08:31 2019 +0100
Refactor DatasetSource fields
---
.../translation/batch/DatasetSourceBatch.java | 40 ++++++++++++----------
1 file changed, 22 insertions(+), 18 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 331e397..e19bbdb 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -49,10 +49,6 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
static final String BEAM_SOURCE_OPTION = "beam-source";
static final String DEFAULT_PARALLELISM = "default-parallelism";
static final String PIPELINE_OPTIONS = "pipeline-options";
- private int numPartitions;
- private Long bundleSize;
- private BoundedSource<T> source;
- private SparkPipelineOptions sparkPipelineOptions;
@SuppressWarnings("unchecked")
@@ -61,31 +57,39 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
throw new RuntimeException("Beam source was not set in DataSource options");
}
- this.source = Base64Serializer
+ BoundedSource<T> source = Base64Serializer
.deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
if (!options.get(DEFAULT_PARALLELISM).isPresent()){
throw new RuntimeException("Spark default parallelism was not set in DataSource options");
}
- if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
- throw new RuntimeException("Beam source was not set in DataSource options");
- }
- this.numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get());
- checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+ int numPartitions = Integer.valueOf(options.get(DEFAULT_PARALLELISM).get());
+ checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
+
if (!options.get(PIPELINE_OPTIONS).isPresent()){
throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
}
- this.sparkPipelineOptions = SerializablePipelineOptions
+ SparkPipelineOptions sparkPipelineOptions = SerializablePipelineOptions
.deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
- this.bundleSize = sparkPipelineOptions.getBundleSize();
- return new DatasetReader(); }
+ return new DatasetReader(numPartitions, source, sparkPipelineOptions);
+ }
/** This class can be mapped to Beam {@link BoundedSource}. */
private class DatasetReader implements DataSourceReader {
+ private int numPartitions;
+ private BoundedSource<T> source;
+ private SparkPipelineOptions sparkPipelineOptions;
private Optional<StructType> schema;
private String checkpointLocation;
+ private DatasetReader(int numPartitions, BoundedSource<T> source,
+ SparkPipelineOptions sparkPipelineOptions) {
+ this.numPartitions = numPartitions;
+ this.source = source;
+ this.sparkPipelineOptions = sparkPipelineOptions;
+ }
+
@Override
public StructType readSchema() {
return new StructType();
@@ -97,11 +101,11 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
long desiredSizeBytes;
try {
desiredSizeBytes =
- (bundleSize == null)
+ (sparkPipelineOptions.getBundleSize() == null)
? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions
- : bundleSize;
- List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, sparkPipelineOptions);
- for (BoundedSource<T> source : sources) {
+ : sparkPipelineOptions.getBundleSize();
+ List<? extends BoundedSource<T>> splits = source.split(desiredSizeBytes, sparkPipelineOptions);
+ for (BoundedSource<T> split : splits) {
result.add(
new InputPartition<InternalRow>() {
@@ -109,7 +113,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
public InputPartitionReader<InternalRow> createPartitionReader() {
BoundedReader<T> reader = null;
try {
- reader = source.createReader(sparkPipelineOptions);
+ reader = split.createReader(sparkPipelineOptions);
} catch (IOException e) {
throw new RuntimeException(
"Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);