You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 10:25:21 UTC
[beam] 07/08: Simplify beam reader creation as it created once the
source as already been partitioned
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9fad3d438d13326e88f74a08d8370eeed8288935
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 11:23:30 2019 +0100
Simplify beam reader creation as it created once the source as already been partitioned
---
.../translation/batch/DatasetSourceBatch.java | 21 +++++++++------------
1 file changed, 9 insertions(+), 12 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index c35f62e..d9e1722 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static scala.collection.JavaConversions.asScalaBuffer;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
@@ -63,7 +64,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
}
/** This class is mapped to Beam {@link BoundedSource}. */
- private static class DatasetReader<T> implements DataSourceReader {
+ private static class DatasetReader<T> implements DataSourceReader, Serializable {
private int numPartitions;
private BoundedSource<T> source;
@@ -135,26 +136,22 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
private boolean started;
private boolean closed;
private BoundedReader<T> reader;
- private BoundedSource<T> source;
- private SerializablePipelineOptions serializablePipelineOptions;
DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) {
- this.source = source;
- this.serializablePipelineOptions = serializablePipelineOptions;
this.started = false;
this.closed = false;
+ // reader is not serializable so lazy initialize it
+ try {
+ reader = source
+ .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
+ } catch (IOException e) {
+ throw new RuntimeException("Error creating BoundedReader ", e);
+ }
}
@Override
public boolean next() throws IOException {
if (!started) {
- // reader is not serializable so lazy initialize it
- try {
- reader = source
- .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class));
- } catch (IOException e) {
- throw new RuntimeException("Error creating BoundedReader ", e);
- }
started = true;
return reader.start();
} else {