You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/15 16:30:56 UTC

[beam] branch spark-runner_structured-streaming updated (f5fd012 -> 707470b)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from f5fd012  Fix errorprone
     new f19721f  Fix testMode output to comply with new binary schema
     new ca88d54  Cleaning
     new 7bb1945  Remove bundleSize parameter and always use spark default parallelism
     new 707470b  Fix split bug

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../structuredstreaming/SparkPipelineOptions.java  |  10 --
 .../spark/structuredstreaming/SparkRunner.java     |   4 +-
 .../translation/PipelineTranslator.java            |   4 +-
 .../translation/TranslationContext.java            |   7 +-
 .../translation/batch/DatasetSourceBatch.java      |   9 +-
 .../translation/batch/PipelineTranslatorBatch.java |   5 +-
 .../translation/batch/TranslationContextBatch.java |  40 -------
 .../batch/mocks/DatasetSourceMockBatch.java        |  94 ---------------
 .../batch/mocks/ReadSourceTranslatorMockBatch.java |  62 ----------
 .../translation/batch/mocks/package-info.java      |  20 ----
 .../streaming/DatasetSourceStreaming.java          | 133 +++------------------
 ...lator.java => PipelineTranslatorStreaming.java} |   6 +-
 .../streaming/StreamingTranslationContext.java     |  29 -----
 13 files changed, 35 insertions(+), 388 deletions(-)
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
 rename runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/{StreamingPipelineTranslator.java => PipelineTranslatorStreaming.java} (87%)
 delete mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java


[beam] 03/04: Remove bundleSize parameter and always use spark default parallelism

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7bb19451dadea0259f6658c7ccc7f157fa0cd576
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 17:06:51 2019 +0100

    Remove bundleSize parameter and always use spark default parallelism
---
 .../spark/structuredstreaming/SparkPipelineOptions.java        | 10 ----------
 .../translation/batch/DatasetSourceBatch.java                  |  5 +----
 2 files changed, 1 insertion(+), 14 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
index 2e6653b..442ccf8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineOptions.java
@@ -73,16 +73,6 @@ public interface SparkPipelineOptions
 
   void setCheckpointDurationMillis(Long durationMillis);
 
-  @Description(
-      "If set bundleSize will be used for splitting BoundedSources, otherwise default to "
-          + "splitting BoundedSources on Spark defaultParallelism. Most effective when used with "
-          + "Spark dynamicAllocation.")
-  @Default.Long(0)
-  Long getBundleSize();
-
-  @Experimental
-  void setBundleSize(Long value);
-
   @Description("Enable/disable sending aggregator values to Spark's metric sinks")
   @Default.Boolean(true)
   Boolean getEnableSparkMetricSinks();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index d966efb..3f6f219 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -113,10 +113,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
       List<InputPartition<InternalRow>> result = new ArrayList<>();
       long desiredSizeBytes;
       try {
-        desiredSizeBytes =
-            (sparkPipelineOptions.getBundleSize() == null)
-                ? source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions
-                : sparkPipelineOptions.getBundleSize();
+        desiredSizeBytes = source.getEstimatedSizeBytes(sparkPipelineOptions) / numPartitions;
         List<? extends BoundedSource<T>> splits = source.split(desiredSizeBytes, sparkPipelineOptions);
         for (BoundedSource<T> split : splits) {
           result.add(


[beam] 04/04: Fix split bug

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 707470b0469fbcf63efd985faf6185be295e7c6d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 17:30:29 2019 +0100

    Fix split bug
---
 .../spark/structuredstreaming/translation/batch/DatasetSourceBatch.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 3f6f219..8f22bc7 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -121,7 +121,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
 
                 @Override
                 public InputPartitionReader<InternalRow> createPartitionReader() {
-                  return new DatasetPartitionReader<>(source, serializablePipelineOptions);
+                  return new DatasetPartitionReader<>(split, serializablePipelineOptions);
                 }
               });
         }


[beam] 02/04: Cleaning

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ca88d547d54ec9e1f5831894106dce076205acbd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 16:42:09 2019 +0100

    Cleaning
---
 .../spark/structuredstreaming/SparkRunner.java     |   4 +-
 .../translation/PipelineTranslator.java            |   4 +-
 .../translation/batch/DatasetSourceBatch.java      |   2 +-
 .../translation/batch/PipelineTranslatorBatch.java |   5 +-
 .../translation/batch/TranslationContextBatch.java |  40 -------
 .../batch/mocks/DatasetSourceMockBatch.java        |  94 ---------------
 .../batch/mocks/ReadSourceTranslatorMockBatch.java |  62 ----------
 .../translation/batch/mocks/package-info.java      |  20 ----
 .../streaming/DatasetSourceStreaming.java          | 133 +++------------------
 ...lator.java => PipelineTranslatorStreaming.java} |   6 +-
 .../streaming/StreamingTranslationContext.java     |  29 -----
 11 files changed, 27 insertions(+), 372 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 97aa4d8..934c6d2 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -22,7 +22,7 @@ import static org.apache.beam.runners.core.construction.PipelineResources.detect
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -124,7 +124,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
     PipelineTranslator pipelineTranslator =
         options.isStreaming()
-            ? new StreamingPipelineTranslator(options)
+            ? new PipelineTranslatorStreaming(options)
             : new PipelineTranslatorBatch(options);
     pipelineTranslator.translate(pipeline);
     return pipelineTranslator.getTranslationContext();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index e0924e3..7fbbfe6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -21,7 +21,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineResources;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * {@link Pipeline.PipelineVisitor} that translates the Beam operators to their Spark counterparts.
  * It also does the pipeline preparation: mode detection, transforms replacement, classpath
  * preparation. If we have a streaming job, it is instantiated as a {@link
- * StreamingPipelineTranslator}. If we have a batch job, it is instantiated as a {@link
+ * PipelineTranslatorStreaming}. If we have a batch job, it is instantiated as a {@link
  * PipelineTranslatorBatch}.
  */
 public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 2a13d98..d966efb 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -183,7 +183,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport {
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-return InternalRow.apply(asScalaBuffer(list).toList());
+      return InternalRow.apply(asScalaBuffer(list).toList());
     }
 
     @Override
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
index 26f1b9c..99d34a6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java
@@ -24,13 +24,14 @@ import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
  * {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in batch mode. This contains
- * only the components specific to batch: {@link TranslationContextBatch}, registry of batch {@link
+ * only the components specific to batch: registry of batch {@link
  * TransformTranslator} and registry lookup code.
  */
 public class PipelineTranslatorBatch extends PipelineTranslator {
@@ -69,7 +70,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
   }
 
   public PipelineTranslatorBatch(SparkPipelineOptions options) {
-    translationContext = new TranslationContextBatch(options);
+    translationContext = new TranslationContext(options);
   }
 
   /** Returns a translator for the given node, if it is possible, otherwise null. */
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java
deleted file mode 100644
index e849471..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/TranslationContextBatch.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.sql.Dataset;
-
-/** This class contains only batch specific context components. */
-public class TranslationContextBatch extends TranslationContext {
-
-  /**
-   * For keeping track about which DataSets don't have a successor. We need to terminate these with
-   * a discarding sink because the Beam model allows dangling operations.
-   */
-  private final Map<PValue, Dataset<?>> danglingDataSets;
-
-  public TranslationContextBatch(SparkPipelineOptions options) {
-    super(options);
-    this.danglingDataSets = new HashMap<>();
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
deleted file mode 100644
index 81aead2..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/DatasetSourceMockBatch.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
-
-import static scala.collection.JavaConversions.asScalaBuffer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
-import org.apache.spark.sql.types.StructType;
-import org.joda.time.Instant;
-
-/**
- * This is a mock source that gives values between 0 and 999.
- */
-public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
-
-  private DatasetSourceMockBatch() {
-  }
-
-  @Override public DataSourceReader createReader(DataSourceOptions options) {
-    return new DatasetReader();
-  }
-
-  /** This class can be mapped to Beam {@link BoundedSource}. */
-  private static class DatasetReader implements DataSourceReader {
-
-    @Override public StructType readSchema() {
-      return new StructType();
-    }
-
-    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
-      List<InputPartition<InternalRow>> result = new ArrayList<>();
-      result.add(new InputPartition<InternalRow>() {
-
-        @Override public InputPartitionReader<InternalRow> createPartitionReader() {
-          return new DatasetPartitionReaderMock();
-        }
-      });
-      return result;
-    }
-  }
-
-  /** This class is a mocked reader. */
-  private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> {
-
-    private ArrayList<Integer> values;
-    private int currentIndex = 0;
-
-    private DatasetPartitionReaderMock() {
-      for (int i = 0; i < 1000; i++){
-        values.add(i);
-      }
-    }
-
-    @Override public boolean next() throws IOException {
-      currentIndex++;
-      return (currentIndex <= values.size());
-    }
-
-    @Override public void close() throws IOException {
-    }
-
-    @Override public InternalRow get() {
-      List<Object> list = new ArrayList<>();
-      list.add(WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), new Instant()));
-      return InternalRow.apply(asScalaBuffer(list).toList());
-    }
-  }
-}
\ No newline at end of file
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
deleted file mode 100644
index 5cfb755..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/ReadSourceTranslatorMockBatch.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
-
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-
-/**
- * Mock translator that generates a source of 0 to 999 and prints it.
- * @param <T>
- */
-public class ReadSourceTranslatorMockBatch<T>
-    implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
-
-  private static String sourceProviderClass = DatasetSourceMockBatch.class.getCanonicalName();
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void translateTransform(
-      PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
-    SparkSession sparkSession = context.getSparkSession();
-
-    Dataset<Row> rowDataset = sparkSession.read().format(sourceProviderClass).load();
-
-    MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
-      @Override public WindowedValue call(Row value) throws Exception {
-        //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue>getAs(0);
-      }
-    };
-    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
-    // be created ?
-    Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
-
-    PCollection<T> output = (PCollection<T>) context.getOutput();
-    context.putDatasetRaw(output, dataset);
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
deleted file mode 100644
index 3c00aaf..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/mocks/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Source mocks, only temporary waiting for the proper source to be done. */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch.mocks;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index 3175aed..69d85d6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -17,25 +17,15 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static scala.collection.JavaConversions.asScalaBuffer;
-
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
@@ -44,144 +34,53 @@ import org.apache.spark.sql.types.StructType;
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
-public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport{
-
-  private int numPartitions;
-  private Long bundleSize;
-  private TranslationContext context;
-  private BoundedSource<T> source;
-
+public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport {
 
-  @Override
-  public MicroBatchReader createMicroBatchReader(
-      Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
-    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
-    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
-    this.bundleSize = context.getOptions().getBundleSize();
-    return new DatasetMicroBatchReader(schema, checkpointLocation, options);
+  @Override public MicroBatchReader createMicroBatchReader(Optional<StructType> schema,
+      String checkpointLocation, DataSourceOptions options) {
+    return new DatasetMicroBatchReader(checkpointLocation, options);
   }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
-  private class DatasetMicroBatchReader implements MicroBatchReader {
+  private static class DatasetMicroBatchReader implements MicroBatchReader {
 
-    private Optional<StructType> schema;
-    private String checkpointLocation;
-    private DataSourceOptions options;
-
-    private DatasetMicroBatchReader(
-        Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+    private DatasetMicroBatchReader(String checkpointLocation, DataSourceOptions options) {
       //TODO deal with schema and options
     }
 
-    @Override
-    public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+    @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
       //TODO extension point for SDF
     }
 
-    @Override
-    public Offset getStartOffset() {
+    @Override public Offset getStartOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override
-    public Offset getEndOffset() {
+    @Override public Offset getEndOffset() {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override
-    public Offset deserializeOffset(String json) {
+    @Override public Offset deserializeOffset(String json) {
       //TODO extension point for SDF
       return null;
     }
 
-    @Override
-    public void commit(Offset end) {
+    @Override public void commit(Offset end) {
       //TODO no more to read after end Offset
     }
 
-    @Override
-    public void stop() {}
+    @Override public void stop() {
+    }
 
-    @Override
-    public StructType readSchema() {
+    @Override public StructType readSchema() {
       return null;
     }
 
-    @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-      List<InputPartition<InternalRow>> result = new ArrayList<>();
-      long desiredSizeBytes;
-      SparkPipelineOptions options = context.getOptions();
-      try {
-        desiredSizeBytes =
-            (bundleSize == null)
-                ? source.getEstimatedSizeBytes(options) / numPartitions
-                : bundleSize;
-        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
-        for (BoundedSource<T> source : sources) {
-          result.add(
-              new InputPartition<InternalRow>() {
-
-                @Override
-                public InputPartitionReader<InternalRow> createPartitionReader() {
-                  BoundedReader<T> reader = null;
-                  try {
-                    reader = source.createReader(options);
-                  } catch (IOException e) {
-                    throw new RuntimeException(
-                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
-                  }
-                  return new DatasetMicroBatchPartitionReader(reader);
-                }
-              });
-        }
-        return result;
-
-      } catch (Exception e) {
-        throw new RuntimeException(
-            "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
-      }
+    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+      return null;
     }
   }
 
-  /** This class can be mapped to Beam {@link BoundedReader}. */
-  private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> {
-
-    BoundedReader<T> reader;
-    private boolean started;
-    private boolean closed;
-
-    DatasetMicroBatchPartitionReader(BoundedReader<T> reader) {
-      this.reader = reader;
-      this.started = false;
-      this.closed = false;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      if (!started) {
-        started = true;
-        return reader.start();
-      } else {
-        return !closed && reader.advance();
-      }
-    }
-
-    @Override
-    public InternalRow get() {
-      List<Object> list = new ArrayList<>();
-      list.add(
-          WindowedValue.timestampedValueInGlobalWindow(
-              reader.getCurrent(), reader.getCurrentTimestamp()));
-      return InternalRow.apply(asScalaBuffer(list).toList());
-    }
-
-    @Override
-    public void close() throws IOException {
-      closed = true;
-      reader.close();
-    }
-  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
similarity index 87%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
index 437aa25..20cefed 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/PipelineTranslatorStreaming.java
@@ -25,12 +25,12 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 
 /**
  * {@link PipelineTranslator} for executing a {@link Pipeline} in Spark in streaming mode. This
- * contains only the components specific to streaming: {@link StreamingTranslationContext}, registry
+ * contains only the components specific to streaming: registry
  * of batch {@link TransformTranslator} and registry lookup code.
  */
-public class StreamingPipelineTranslator extends PipelineTranslator {
+public class PipelineTranslatorStreaming extends PipelineTranslator {
 
-  public StreamingPipelineTranslator(SparkPipelineOptions options) {}
+  public PipelineTranslatorStreaming(SparkPipelineOptions options) {}
 
   @Override
   protected TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
deleted file mode 100644
index f827cc4..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
-
-import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-
-/** This class contains only streaming specific context components. */
-public class StreamingTranslationContext extends TranslationContext {
-
-  public StreamingTranslationContext(SparkPipelineOptions options) {
-    super(options);
-  }
-}


[beam] 01/04: Fix testMode output to comply with new binary schema

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f19721f35b9249c1db712dc4c72a588105019726
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 15 16:00:55 2019 +0100

    Fix testMode output to comply with new binary schema
---
 .../spark/structuredstreaming/translation/TranslationContext.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 33706bd..0f20663 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -180,7 +181,11 @@ public class TranslationContext {
           dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
         } else {
           if (testMode){
-            dataset.show();
+            // cannot use dataset.show because dataset schema is binary so it will print binary code.
+            List<WindowedValue> windowedValues = ((Dataset<WindowedValue>)dataset).collectAsList();
+            for (WindowedValue windowedValue : windowedValues){
+              System.out.println(windowedValue);
+            }
           } else {
             // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
             dataset.foreachPartition(t -> {