You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/15 16:30:58 UTC
[beam] 02/04: Cleaning
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit ca88d547d54ec9e1f5831894106dce076205acbd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 16:42:09 2019 +0100
Cleaning
---
.../spark/structuredstreaming/SparkRunner.java | 4 +-
.../translation/PipelineTranslator.java | 4 +-
.../translation/batch/DatasetSourceBatch.java | 2 +-
.../translation/batch/PipelineTranslatorBatch.java | 5 +-
.../translation/batch/TranslationContextBatch.java | 40 -------
.../batch/mocks/DatasetSourceMockBatch.java | 94 ---------------
.../batch/mocks/ReadSourceTranslatorMockBatch.java | 62 ----------
.../translation/batch/mocks/package-info.java | 20 ----
.../streaming/DatasetSourceStreaming.java | 133 +++------------------
...lator.java => PipelineTranslatorStreaming.java} | 6 +-
.../streaming/StreamingTranslationContext.java | 29 -----
11 files changed, 27 insertions(+), 372 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 97aa4d8..934c6d2 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -22,7 +22,7 @@ import static org.apache.beam.runners.core.construction.PipelineResources.detect
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -124,7 +124,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
PipelineTranslator pipelineTranslator =
options.isStreaming()
- ? new StreamingPipelineTranslator(options)
+ ? new PipelineTranslatorStreaming(options)
: new PipelineTranslatorBatch(options);
pipelineTranslator.translate(pipeline);
return pipelineTranslator.getTranslationContext();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index e0924e3..7fbbfe6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
* {@link Pipeline.PipelineVisitor} that translates the Beam operators to their Spark counterparts.
* It also does the pipeline preparation: mode detection, transforms replacement, classpath
* preparation. If we have a streaming job, it is instantiated as a {@link
- * StreamingPipelineTranslator}. If we have a batch job, it is instantiated as a {@link
+ * PipelineTranslatorStreaming}. If we have a batch job, it is instantiated as a {@link
* PipelineTranslatorBatch}.
*/
public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 2a13d98..d966efb 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -183,7 +183,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
} catch (IOException e) {
throw new RuntimeException(e);
}
-return InternalRow.apply(asScalaBuffer(list).toList());
+ return InternalRow.apply(asScalaBuffer(list).toList());
}
@Override
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 26f1b9c..99d34a6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -24,13 +24,14 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
/**
* {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in batch mode. This contains
- * only the components specific to batch: {@link TranslationContextBatch}, registry of batch {@link
+ * only the components specific to batch: registry of batch {@link
* TransformTranslator} and registry lookup code.
*/
public class PipelineTranslatorBatch extends PipelineTranslator {
@@ -69,7 +70,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
}
public PipelineTranslatorBatch(SparkPipelineOptions options) {
- translationContext = new TranslationContextBatch(options);
+ translationContext = new TranslationContext(options);
}
/** Returns a translator for the given node, if it is possible, otherwise null. */
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java
deleted file mode 100644
index e849471..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.sql.Dataset;
-
-/** This class contains only batch specific context components. */
-public class TranslationContextBatch extends TranslationContext {
-
- /**
- * For keeping track about which DataSets don't have a successor. We need to terminate these with
- * a discarding sink because the Beam model allows dangling operations.
- */
- private final Map<PValue, Dataset<?>> danglingDataSets;
-
- public TranslationContextBatch(SparkPipelineOptions options) {
- super(options);
- this.danglingDataSets = new HashMap<>();
- }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
deleted file mode 100644
index 81aead2..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
-
-import static scala.collection.JavaConversions.asScalaBuffer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.types.StructType;
-import org.joda.time.Instant;
-
-/**
- * This is a mock source that gives values between 0 and 999.
- */
-public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
-
- private DatasetSourceMockBatch() {
- }
-
- @Override public DataSourceReader createReader(DataSourceOptions options) {
- return new DatasetReader();
- }
-
- /** This class can be mapped to Beam {@link BoundedSource}. */
- private static class DatasetReader implements DataSourceReader {
-
- @Override public StructType readSchema() {
- return new StructType();
- }
-
- @Override public List<InputPartition<InternalRow>> planInputPartitions() {
- List<InputPartition<InternalRow>> result = new ArrayList<>();
- result.add(new InputPartition<InternalRow>() {
-
- @Override public InputPartitionReader<InternalRow> createPartitionReader() {
- return new DatasetPartitionReaderMock();
- }
- });
- return result;
- }
- }
-
- /** This class is a mocked reader. */
- private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
-
- private ArrayList<Integer> values;
- private int currentIndex = 0;
-
- private DatasetPartitionReaderMock() {
- for (int i = 0; i < 1000; i++){
- values.add(i);
- }
- }
-
- @Override public boolean next() throws IOException {
- currentIndex++;
- return (currentIndex <= values.size());
- }
-
- @Override public void close() throws IOException {
- }
-
- @Override public InternalRow get() {
- List<Object> list = new ArrayList<>();
- list.add(WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), new Instant()));
- return InternalRow.apply(asScalaBuffer(list).toList());
- }
- }
-}
\ No newline at end of file
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
deleted file mode 100644
index 5cfb755..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
-
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-
-/**
- * Mock translator that generates a source of 0 to 999 and prints it.
- * @param <T>
- */
-public class ReadSourceTranslatorMockBatch<T>
- implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
-
- private static String sourceProviderClass = DatasetSourceMockBatch.class.getCanonicalName();
-
- @SuppressWarnings("unchecked")
- @Override
- public void translateTransform(
- PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
- SparkSession sparkSession = context.getSparkSession();
-
- Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).load();
-
- MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
- @Override public WindowedValue call(Row value) throws Exception {
- //there is only one value put in each Row by the InputPartitionReader
- return value.<WindowedValue>getAs(0);
- }
- };
- //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
- // be created ?
- Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
-
- PCollection<T> output = (PCollection<T>) context.getOutput();
- context.putDatasetRaw(output, dataset);
- }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
deleted file mode 100644
index 3c00aaf..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Source mocks, only temporary waiting for the proper source to be done. */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index 3175aed..69d85d6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -17,25 +17,15 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
-import static com.google.common.base.Preconditions.checkArgument;
-import static scala.collection.JavaConversions.asScalaBuffer;
-
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
@@ -44,144 +34,53 @@ import org.apache.spark.sql.types.StructType;
* This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
* is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
*/
-public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport{
-
- private int numPartitions;
- private Long bundleSize;
- private TranslationContext context;
- private BoundedSource<T> source;
-
+public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport {
- @Override
- public MicroBatchReader createMicroBatchReader(
- Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
- this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
- checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
- this.bundleSize = context.getOptions().getBundleSize();
- return new DatasetMicroBatchReader(schema, checkpointLocation, options);
+ @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
+ String checkpointLocation, DataSourceOptions options) {
+ return new DatasetMicroBatchReader(checkpointLocation, options);
}
/** This class can be mapped to Beam {@link BoundedSource}. */
- private class DatasetMicroBatchReader implements MicroBatchReader {
+ private static class DatasetMicroBatchReader implements MicroBatchReader {
- private Optional<StructType> schema;
- private String checkpointLocation;
- private DataSourceOptions options;
-
- private DatasetMicroBatchReader(
- Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+ private DatasetMicroBatchReader(String checkpointLocation, DataSourceOptions options) {
//TODO deal with schema and options
}
- @Override
- public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+ @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
//TODO extension point for SDF
}
- @Override
- public Offset getStartOffset() {
+ @Override public Offset getStartOffset() {
//TODO extension point for SDF
return null;
}
- @Override
- public Offset getEndOffset() {
+ @Override public Offset getEndOffset() {
//TODO extension point for SDF
return null;
}
- @Override
- public Offset deserializeOffset(String json) {
+ @Override public Offset deserializeOffset(String json) {
//TODO extension point for SDF
return null;
}
- @Override
- public void commit(Offset end) {
+ @Override public void commit(Offset end) {
//TODO no more to read after end Offset
}
- @Override
- public void stop() {}
+ @Override public void stop() {
+ }
- @Override
- public StructType readSchema() {
+ @Override public StructType readSchema() {
return null;
}
- @Override
- public List<InputPartition<InternalRow>> planInputPartitions() {
- List<InputPartition<InternalRow>> result = new ArrayList<>();
- long desiredSizeBytes;
- SparkPipelineOptions options = context.getOptions();
- try {
- desiredSizeBytes =
- (bundleSize == null)
- ? source.getEstimatedSizeBytes(options) / numPartitions
- : bundleSize;
- List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
- for (BoundedSource<T> source : sources) {
- result.add(
- new InputPartition<InternalRow>() {
-
- @Override
- public InputPartitionReader<InternalRow> createPartitionReader() {
- BoundedReader<T> reader = null;
- try {
- reader = source.createReader(options);
- } catch (IOException e) {
- throw new RuntimeException(
- "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
- }
- return new DatasetMicroBatchPartitionReader(reader);
- }
- });
- }
- return result;
-
- } catch (Exception e) {
- throw new RuntimeException(
- "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
- }
+ @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+ return null;
}
}
- /** This class can be mapped to Beam {@link BoundedReader}. */
- private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
-
- BoundedReader<T> reader;
- private boolean started;
- private boolean closed;
-
- DatasetMicroBatchPartitionReader(BoundedReader<T> reader) {
- this.reader = reader;
- this.started = false;
- this.closed = false;
- }
-
- @Override
- public boolean next() throws IOException {
- if (!started) {
- started = true;
- return reader.start();
- } else {
- return !closed && reader.advance();
- }
- }
-
- @Override
- public InternalRow get() {
- List<Object> list = new ArrayList<>();
- list.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
- return InternalRow.apply(asScalaBuffer(list).toList());
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- reader.close();
- }
- }
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
similarity index 87%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
index 437aa25..20cefed 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
@@ -25,12 +25,12 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
/**
* {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in streaming mode. This
- * contains only the components specific to streaming: {@link StreamingTranslationContext}, registry
+ * contains only the components specific to streaming: registry
* of batch {@link TransformTranslator} and registry lookup code.
*/
-public class StreamingPipelineTranslator extends PipelineTranslator {
+public class PipelineTranslatorStreaming extends PipelineTranslator {
- public StreamingPipelineTranslator(SparkPipelineOptions options) {}
+ public PipelineTranslatorStreaming(SparkPipelineOptions options) {}
@Override
protected TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
deleted file mode 100644
index f827cc4..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
-
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-
-/** This class contains only streaming specific context components. */
-public class StreamingTranslationContext extends TranslationContext {
-
- public StreamingTranslationContext(SparkPipelineOptions options) {
- super(options);
- }
-}