You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/15 16:30:58 UTC

[beam] 02/04: Cleaning

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ca88d547d54ec9e1f5831894106dce076205acbd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 16:42:09 2019 +0100

    Cleaning
---
 .../spark/structuredstreaming/SparkRunner.java     |   4 +-
 .../translation/PipelineTranslator.java            |   4 +-
 .../translation/batch/DatasetSourceBatch.java      |   2 +-
 .../translation/batch/PipelineTranslatorBatch.java |   5 +-
 .../translation/batch/TranslationContextBatch.java |  40 -------
 .../batch/mocks/DatasetSourceMockBatch.java        |  94 ---------------
 .../batch/mocks/ReadSourceTranslatorMockBatch.java |  62 ----------
 .../translation/batch/mocks/package-info.java      |  20 ----
 .../streaming/DatasetSourceStreaming.java          | 133 +++------------------
 ...lator.java => PipelineTranslatorStreaming.java} |   6 +-
 .../streaming/StreamingTranslationContext.java     |  29 -----
 11 files changed, 27 insertions(+), 372 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 97aa4d8..934c6d2 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -22,7 +22,7 @@ import static org.apache.beam.runners.core.construction.PipelineResources.detect
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -124,7 +124,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
     PipelineTranslator pipelineTranslator =
         options.isStreaming()
-            ? new StreamingPipelineTranslator(options)
+            ? new PipelineTranslatorStreaming(options)
             : new PipelineTranslatorBatch(options);
     pipelineTranslator.translate(pipeline);
     return pipelineTranslator.getTranslationContext();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index e0924e3..7fbbfe6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineResources;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * {@link Pipeline.PipelineVisitor} that translates the Beam operators to their Spark counterparts.
  * It also does the pipeline preparation: mode detection, transforms replacement, classpath
  * preparation. If we have a streaming job, it is instantiated as a {@link
- * StreamingPipelineTranslator}. If we have a batch job, it is instantiated as a {@link
+ * PipelineTranslatorStreaming}. If we have a batch job, it is instantiated as a {@link
  * PipelineTranslatorBatch}.
  */
 public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 2a13d98..d966efb 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -183,7 +183,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-return InternalRow.apply(asScalaBuffer(list).toList());
+      return InternalRow.apply(asScalaBuffer(list).toList());
     }
 
     @Override
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 26f1b9c..99d34a6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -24,13 +24,14 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
  * {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in batch mode. This contains
- * only the components specific to batch: {@link TranslationContextBatch}, registry of batch {@link
+ * only the components specific to batch: registry of batch {@link
  * TransformTranslator} and registry lookup code.
  */
 public class PipelineTranslatorBatch extends PipelineTranslator {
@@ -69,7 +70,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
   }
 
   public PipelineTranslatorBatch(SparkPipelineOptions options) {
-    translationContext = new TranslationContextBatch(options);
+    translationContext = new TranslationContext(options);
   }
 
   /** Returns a translator for the given node, if it is possible, otherwise null. */
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java
deleted file mode 100644
index e849471..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.sql.Dataset;
-
-/** This class contains only batch specific context components. */
-public class TranslationContextBatch extends TranslationContext {
-
-  /**
-   * For keeping track about which DataSets don't have a successor. We need to terminate these with
-   * a discarding sink because the Beam model allows dangling operations.
-   */
-  private final Map<PValue, Dataset<?>> danglingDataSets;
-
-  public TranslationContextBatch(SparkPipelineOptions options) {
-    super(options);
-    this.danglingDataSets = new HashMap<>();
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
deleted file mode 100644
index 81aead2..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
-
-import static scala.collection.JavaConversions.asScalaBuffer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.types.StructType;
-import org.joda.time.Instant;
-
-/**
- * This is a mock source that gives values between 0 and 999.
- */
-public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
-
-  private DatasetSourceMockBatch() {
-  }
-
-  @Override public DataSourceReader createReader(DataSourceOptions options) {
-    return new DatasetReader();
-  }
-
-  /** This class can be mapped to Beam {@link BoundedSource}. */
-  private static class DatasetReader implements DataSourceReader {
-
-    @Override public StructType readSchema() {
-      return new StructType();
-    }
-
-    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
-      List<InputPartition<InternalRow>> result = new ArrayList<>();
-      result.add(new InputPartition<InternalRow>() {
-
-        @Override public InputPartitionReader<InternalRow> createPartitionReader() {
-          return new DatasetPartitionReaderMock();
-        }
-      });
-      return result;
-    }
-  }
-
-  /** This class is a mocked reader. */
-  private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
-
-    private ArrayList<Integer> values;
-    private int currentIndex = 0;
-
-    private DatasetPartitionReaderMock() {
-      for (int i = 0; i < 1000; i++){
-        values.add(i);
-      }
-    }
-
-    @Override public boolean next() throws IOException {
-      currentIndex++;
-      return (currentIndex <= values.size());
-    }
-
-    @Override public void close() throws IOException {
-    }
-
-    @Override public InternalRow get() {
-      List<Object> list = new ArrayList<>();
-      list.add(WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), new Instant()));
-      return InternalRow.apply(asScalaBuffer(list).toList());
-    }
-  }
-}
\ No newline at end of file
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
deleted file mode 100644
index 5cfb755..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
-
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-
-/**
- * Mock translator that generates a source of 0 to 999 and prints it.
- * @param <T>
- */
-public class ReadSourceTranslatorMockBatch<T>
-    implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
-
-  private static String sourceProviderClass = DatasetSourceMockBatch.class.getCanonicalName();
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void translateTransform(
-      PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
-    SparkSession sparkSession = context.getSparkSession();
-
-    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).load();
-
-    MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
-      @Override public WindowedValue call(Row value) throws Exception {
-        //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue>getAs(0);
-      }
-    };
-    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
-    // be created ?
-    Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
-
-    PCollection<T> output = (PCollection<T>) context.getOutput();
-    context.putDatasetRaw(output, dataset);
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
deleted file mode 100644
index 3c00aaf..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Source mocks, only temporary waiting for the proper source to be done. */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index 3175aed..69d85d6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -17,25 +17,15 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static scala.collection.JavaConversions.asScalaBuffer;
-
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
@@ -44,144 +34,53 @@ import org.apache.spark.sql.types.StructType;
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
-public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport{
-
-  private int numPartitions;
-  private Long bundleSize;
-  private TranslationContext context;
-  private BoundedSource<T> source;
-
+public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport {
 
-  @Override
-  public MicroBatchReader createMicroBatchReader(
-      Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
-    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
-    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
-    this.bundleSize = context.getOptions().getBundleSize();
-    return new DatasetMicroBatchReader(schema, checkpointLocation, options);
+  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
+      String checkpointLocation, DataSourceOptions options) {
+    return new DatasetMicroBatchReader(checkpointLocation, options);
   }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
-  private class DatasetMicroBatchReader implements MicroBatchReader {
+  private static class DatasetMicroBatchReader implements MicroBatchReader {
 
-    private Optional<StructType> schema;
-    private String checkpointLocation;
-    private DataSourceOptions options;
-
-    private DatasetMicroBatchReader(
-        Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+    private DatasetMicroBatchReader(String checkpointLocation, DataSourceOptions options) {
       //TODO deal with schema and options
     }
 
-    @Override
-    public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
       //TODO extension point for SDF
     }
 
-    @Override
-    public Offset getStartOffset() {
+    @Override public Offset getStartOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override
-    public Offset getEndOffset() {
+    @Override public Offset getEndOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override
-    public Offset deserializeOffset(String json) {
+    @Override public Offset deserializeOffset(String json) {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override
-    public void commit(Offset end) {
+    @Override public void commit(Offset end) {
       //TODO no more to read after end Offset
     }
 
-    @Override
-    public void stop() {}
+    @Override public void stop() {
+    }
 
-    @Override
-    public StructType readSchema() {
+    @Override public StructType readSchema() {
       return null;
     }
 
-    @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-      List<InputPartition<InternalRow>> result = new ArrayList<>();
-      long desiredSizeBytes;
-      SparkPipelineOptions options = context.getOptions();
-      try {
-        desiredSizeBytes =
-            (bundleSize == null)
-                ? source.getEstimatedSizeBytes(options) / numPartitions
-                : bundleSize;
-        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
-        for (BoundedSource<T> source : sources) {
-          result.add(
-              new InputPartition<InternalRow>() {
-
-                @Override
-                public InputPartitionReader<InternalRow> createPartitionReader() {
-                  BoundedReader<T> reader = null;
-                  try {
-                    reader = source.createReader(options);
-                  } catch (IOException e) {
-                    throw new RuntimeException(
-                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
-                  }
-                  return new DatasetMicroBatchPartitionReader(reader);
-                }
-              });
-        }
-        return result;
-
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
-      }
+    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+      return null;
     }
   }
 
-  /** This class can be mapped to Beam {@link BoundedReader}. */
-  private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
-
-    BoundedReader<T> reader;
-    private boolean started;
-    private boolean closed;
-
-    DatasetMicroBatchPartitionReader(BoundedReader<T> reader) {
-      this.reader = reader;
-      this.started = false;
-      this.closed = false;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      if (!started) {
-        started = true;
-        return reader.start();
-      } else {
-        return !closed && reader.advance();
-      }
-    }
-
-    @Override
-    public InternalRow get() {
-      List<Object> list = new ArrayList<>();
-      list.add(
-          WindowedValue.timestampedValueInGlobalWindow(
-              reader.getCurrent(), reader.getCurrentTimestamp()));
-      return InternalRow.apply(asScalaBuffer(list).toList());
-    }
-
-    @Override
-    public void close() throws IOException {
-      closed = true;
-      reader.close();
-    }
-  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
similarity index 87%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
index 437aa25..20cefed 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
@@ -25,12 +25,12 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 
 /**
  * {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in streaming mode. This
- * contains only the components specific to streaming: {@link StreamingTranslationContext}, registry
+ * contains only the components specific to streaming: registry
  * of batch {@link TransformTranslator} and registry lookup code.
  */
-public class StreamingPipelineTranslator extends PipelineTranslator {
+public class PipelineTranslatorStreaming extends PipelineTranslator {
 
-  public StreamingPipelineTranslator(SparkPipelineOptions options) {}
+  public PipelineTranslatorStreaming(SparkPipelineOptions options) {}
 
   @Override
   protected TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
deleted file mode 100644
index f827cc4..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
-
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-
-/** This class contains only streaming specific context components. */
-public class StreamingTranslationContext extends TranslationContext {
-
-  public StreamingTranslationContext(SparkPipelineOptions options) {
-    super(options);
-  }
-}