You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 10:25:15 UTC
[beam] 01/08: Fix serialization issues
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit bab9027f9fd6d7bf38134518b98d942fc75ec16a
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 17:22:13 2019 +0100
Fix serialization issues
---
.../translation/batch/DatasetSourceBatch.java | 83 +++++++++++-----------
1 file changed, 42 insertions(+), 41 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 421a3f9..c35f62e 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -23,8 +23,8 @@ import static scala.collection.JavaConversions.asScalaBuffer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.sdk.io.BoundedSource;
@@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType;
* This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
* is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
*/
-public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
+public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
static final String BEAM_SOURCE_OPTION = "beam-source";
static final String DEFAULT_PARALLELISM = "default-parallelism";
@@ -59,38 +59,35 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
@SuppressWarnings("unchecked")
@Override
public DataSourceReader createReader(DataSourceOptions options) {
- if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
- throw new RuntimeException("Beam source was not set in DataSource options");
- }
- BoundedSource<T> source = Base64Serializer
- .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
-
- if (!options.get(DEFAULT_PARALLELISM).isPresent()){
- throw new RuntimeException("Spark default parallelism was not set in DataSource options");
- }
- int numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
- checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
-
- if (!options.get(PIPELINE_OPTIONS).isPresent()){
- throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
- }
- SparkPipelineOptions sparkPipelineOptions = PipelineOptionsSerializationUtils
- .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
- return new DatasetReader(numPartitions, source, sparkPipelineOptions);
+ return new DatasetReader<>(options);
}
- /** This class can be mapped to Beam {@link BoundedSource}. */
- private class DatasetReader implements DataSourceReader {
+ /** This class is mapped to Beam {@link BoundedSource}. */
+ private static class DatasetReader<T> implements DataSourceReader {
private int numPartitions;
private BoundedSource<T> source;
- private SparkPipelineOptions sparkPipelineOptions;
+ private SerializablePipelineOptions serializablePipelineOptions;
- private DatasetReader(int numPartitions, BoundedSource<T> source,
- SparkPipelineOptions sparkPipelineOptions) {
- this.numPartitions = numPartitions;
- this.source = source;
- this.sparkPipelineOptions = sparkPipelineOptions;
+ private DatasetReader(DataSourceOptions options) {
+ if (!options.get(BEAM_SOURCE_OPTION).isPresent()){
+ throw new RuntimeException("Beam source was not set in DataSource options");
+ }
+ this.source = Base64Serializer
+ .deserializeUnchecked(options.get(BEAM_SOURCE_OPTION).get(), BoundedSource.class);
+
+ if (!options.get(DEFAULT_PARALLELISM).isPresent()){
+ throw new RuntimeException("Spark default parallelism was not set in DataSource options");
+ }
+ this.numPartitions = Integer.parseInt(options.get(DEFAULT_PARALLELISM).get());
+ checkArgument(numPartitions > 0, "Number of partitions must be greater than zero.");
+
+ if (!options.get(PIPELINE_OPTIONS).isPresent()){
+ throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
+ }
+ SparkPipelineOptions sparkPipelineOptions = PipelineOptionsSerializationUtils
+ .deserializeFromJson(options.get(PIPELINE_OPTIONS).get()).as(SparkPipelineOptions.class);
+ this.serializablePipelineOptions = new SerializablePipelineOptions(sparkPipelineOptions);
}
@Override
@@ -104,6 +101,8 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
+ SparkPipelineOptions sparkPipelineOptions = serializablePipelineOptions.get()
+ .as(SparkPipelineOptions.class);
List<InputPartition<InternalRow>> result = new ArrayList<>();
long desiredSizeBytes;
try {
@@ -118,14 +117,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- BoundedReader<T> reader = null;
- try {
- reader = split.createReader(sparkPipelineOptions);
- } catch (IOException e) {
- throw new RuntimeException(
- "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
- }
- return new DatasetPartitionReader(reader);
+ return new DatasetPartitionReader<>(source, serializablePipelineOptions);
}
});
}
@@ -139,14 +131,16 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
}
/** This class can be mapped to Beam {@link BoundedReader}. */
- private class DatasetPartitionReader implements InputPartitionReader<InternalRow> {
-
- BoundedReader<T> reader;
+ private static class DatasetPartitionReader<T> implements InputPartitionReader<InternalRow> {
private boolean started;
private boolean closed;
+ private BoundedReader<T> reader;
+ private BoundedSource<T> source;
+ private SerializablePipelineOptions serializablePipelineOptions;
- DatasetPartitionReader(BoundedReader<T> reader) {
- this.reader = reader;
+ DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
+ this.source = source;
+ this.serializablePipelineOptions = serializablePipelineOptions;
this.started = false;
this.closed = false;
}
@@ -154,6 +148,13 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
@Override
public boolean next() throws IOException {
if (!started) {
+ // reader is not serializable so lazy initialize it
+ try {
+ reader = source
+ .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
+ } catch (IOException e) {
+ throw new RuntimeException("Error creating BoundedReader ", e);
+ }
started = true;
return reader.start();
} else {