You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/28 09:29:48 UTC

[beam] branch spark-runner_structured-streaming updated (8591d63 -> 77cacde)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 8591d63  Run pipeline in batch mode or in streaming mode
     new d9869c4  Split batch and streaming sources and translators
     new 36a72f7  Use raw Encoder<WindowedValue> also in regular ReadSourceTranslatorBatch
     new 91baa65  Cleaning
     new 77cacde  Add ReadSourceTranslatorStreaming

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/TranslationContext.java            |   1 -
 .../translation/batch/DatasetSourceBatch.java      | 147 +++++++++++++++++
 .../DatasetSourceMockBatch.java}                   |   4 +-
 .../batch/ReadSourceTranslatorBatch.java           |  32 ++--
 .../batch/ReadSourceTranslatorMockBatch.java       |   7 +-
 .../DatasetSourceStreaming.java}                   | 176 +--------------------
 .../ReadSourceTranslatorStreaming.java}            |  35 ++--
 7 files changed, 189 insertions(+), 213 deletions(-)
 create mode 100644 runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
 rename runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/{io/DatasetSourceMock.java => batch/DatasetSourceMockBatch.java} (97%)
 rename runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/{io/DatasetSource.java => streaming/DatasetSourceStreaming.java} (61%)
 copy runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/{batch/ReadSourceTranslatorMockBatch.java => streaming/ReadSourceTranslatorStreaming.java} (65%)


[beam] 03/04: Cleaning

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 91baa6539d669dea46e1ac94716da5babb9a9a06
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 28 10:24:11 2018 +0100

    Cleaning
---
 .../translation/batch/DatasetSourceBatch.java      |   3 +-
 .../streaming/DatasetStreamingSource.java          | 172 +--------------------
 2 files changed, 2 insertions(+), 173 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 1ad16eb..f4cd885 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -41,8 +41,7 @@ import org.apache.spark.sql.types.StructType;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
- * class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
 public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
index 8701a83..6947b6d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
@@ -53,8 +53,7 @@ import scala.collection.immutable.Map;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
- * class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
 public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{
 
@@ -196,173 +195,4 @@ public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSu
       reader.close();
     }
   }
-
-  private static class DatasetCatalog<T> extends Catalog {
-
-    TranslationContext context;
-    Source<T> source;
-
-    private DatasetCatalog(TranslationContext context, Source<T> source) {
-      this.context = context;
-      this.source = source;
-    }
-
-    @Override public String currentDatabase() {
-      return null;
-    }
-
-    @Override public void setCurrentDatabase(String dbName) {
-
-    }
-
-    @Override public Dataset<Database> listDatabases() {
-      return null;
-    }
-
-    @Override public Dataset<Table> listTables() {
-      return null;
-    }
-
-    @Override public Dataset<Table> listTables(String dbName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Dataset<Function> listFunctions() {
-      return null;
-    }
-
-    @Override public Dataset<Function> listFunctions(String dbName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Dataset<Column> listColumns(String tableName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Dataset<Column> listColumns(String dbName, String tableName)
-        throws AnalysisException {
-      return null;
-    }
-
-    @Override public Database getDatabase(String dbName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Table getTable(String tableName) throws AnalysisException {
-      return new DatasetTable<>("beam", "beaam", "beam fake table to wire up with Beam sources",
-          null, true, source, context);
-    }
-
-    @Override public Table getTable(String dbName, String tableName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Function getFunction(String functionName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Function getFunction(String dbName, String functionName)
-        throws AnalysisException {
-      return null;
-    }
-
-    @Override public boolean databaseExists(String dbName) {
-      return false;
-    }
-
-    @Override public boolean tableExists(String tableName) {
-      return false;
-    }
-
-    @Override public boolean tableExists(String dbName, String tableName) {
-      return false;
-    }
-
-    @Override public boolean functionExists(String functionName) {
-      return false;
-    }
-
-    @Override public boolean functionExists(String dbName, String functionName) {
-      return false;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String path) {
-      return null;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String path, String source) {
-      return null;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String source,
-        Map<String, String> options) {
-      return null;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String source, StructType schema,
-        Map<String, String> options) {
-      return null;
-    }
-
-    @Override public boolean dropTempView(String viewName) {
-      return false;
-    }
-
-    @Override public boolean dropGlobalTempView(String viewName) {
-      return false;
-    }
-
-    @Override public void recoverPartitions(String tableName) {
-
-    }
-
-    @Override public boolean isCached(String tableName) {
-      return false;
-    }
-
-    @Override public void cacheTable(String tableName) {
-
-    }
-
-    @Override public void cacheTable(String tableName, StorageLevel storageLevel) {
-
-    }
-
-    @Override public void uncacheTable(String tableName) {
-
-    }
-
-    @Override public void clearCache() {
-
-    }
-
-    @Override public void refreshTable(String tableName) {
-
-    }
-
-    @Override public void refreshByPath(String path) {
-
-    }
-
-    private static class DatasetTable<T> extends Table {
-
-      private Source<T> source;
-      private TranslationContext context;
-
-      public DatasetTable(String name, String database, String description, String tableType,
-          boolean isTemporary, Source<T> source, TranslationContext context) {
-        super(name, database, description, tableType, isTemporary);
-        this.source = source;
-        this.context = context;
-      }
-
-      private Source<T> getSource() {
-        return source;
-      }
-
-      private TranslationContext getContext() {
-        return context;
-      }
-    }
-  }
 }


[beam] 01/04: Split batch and streaming sources and translators

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d9869c45c7e8885a8d462769951b1ac4f5f499cb
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Dec 27 17:20:21 2018 +0100

    Split batch and streaming sources and translators
---
 .../translation/batch/DatasetSourceBatch.java      | 148 +++++++++++++++++++++
 .../DatasetSourceMockBatch.java}                   |   4 +-
 .../batch/ReadSourceTranslatorBatch.java           |  20 +--
 .../batch/ReadSourceTranslatorMockBatch.java       |   5 +-
 .../DatasetStreamingSource.java}                   |   4 +-
 5 files changed, 158 insertions(+), 23 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
new file mode 100644
index 0000000..1ad16eb
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static scala.collection.JavaConversions.asScalaBuffer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
+ * class is just a mix-in.
+ */
+public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
+
+  private int numPartitions;
+  private Long bundleSize;
+  private TranslationContext context;
+  private BoundedSource<T> source;
+
+
+  @Override public DataSourceReader createReader(DataSourceOptions options) {
+    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+    this.bundleSize = context.getOptions().getBundleSize();
+    return new DatasetReader();  }
+
+  /** This class can be mapped to Beam {@link BoundedSource}. */
+  private class DatasetReader implements DataSourceReader {
+
+    private Optional<StructType> schema;
+    private String checkpointLocation;
+    private DataSourceOptions options;
+
+    @Override
+    public StructType readSchema() {
+      return new StructType();
+    }
+
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+      List<InputPartition<InternalRow>> result = new ArrayList<>();
+      long desiredSizeBytes;
+      SparkPipelineOptions options = context.getOptions();
+      try {
+        desiredSizeBytes =
+            (bundleSize == null)
+                ? source.getEstimatedSizeBytes(options) / numPartitions
+                : bundleSize;
+        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes, options);
+        for (BoundedSource<T> source : sources) {
+          result.add(
+              new InputPartition<InternalRow>() {
+
+                @Override
+                public InputPartitionReader<InternalRow> createPartitionReader() {
+                  BoundedReader<T> reader = null;
+                  try {
+                    reader = source.createReader(options);
+                  } catch (IOException e) {
+                    throw new RuntimeException(
+                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(), e);
+                  }
+                  return new DatasetPartitionReader(reader);
+                }
+              });
+        }
+        return result;
+
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
+      }
+    }
+  }
+
+  /** This class can be mapped to Beam {@link BoundedReader} */
+  private class DatasetPartitionReader implements InputPartitionReader<InternalRow> {
+
+    BoundedReader<T> reader;
+    private boolean started;
+    private boolean closed;
+
+    DatasetPartitionReader(BoundedReader<T> reader) {
+      this.reader = reader;
+      this.started = false;
+      this.closed = false;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (!started) {
+        started = true;
+        return reader.start();
+      } else {
+        return !closed && reader.advance();
+      }
+    }
+
+    @Override
+    public InternalRow get() {
+      List<Object> list = new ArrayList<>();
+      list.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              reader.getCurrent(), reader.getCurrentTimestamp()));
+      return InternalRow.apply(asScalaBuffer(list).toList());
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      reader.close();
+    }
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
similarity index 97%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
index f722377..b616a6f 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming.translation.io;
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import static scala.collection.JavaConversions.asScalaBuffer;
 
@@ -37,7 +37,7 @@ import org.joda.time.Instant;
 /**
  * This is a mock source that gives values between 0 and 999.
  */
-public class DatasetSourceMock implements DataSourceV2, ReadSupport {
+public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
 
   @Override public DataSourceReader createReader(DataSourceOptions options) {
     return new DatasetReader();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index aed016a..370e3f4 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -21,32 +21,23 @@ import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetStreamingSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
 import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalog.Catalog;
-import org.apache.spark.sql.catalyst.catalog.CatalogTable;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
 import org.apache.spark.sql.streaming.DataStreamReader;
 
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = DatasetSource.class.getCanonicalName();
+  private String SOURCE_PROVIDER_CLASS = DatasetSourceBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -64,12 +55,11 @@ class ReadSourceTranslatorBatch<T>
       throw new RuntimeException(e);
     }
     SparkSession sparkSession = context.getSparkSession();
-    DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
 
-    Dataset<Row> rowDataset = dataStreamReader.load();
+    Dataset<Row> rowDataset = sparkSession.read().format(providerClassName).load();
 
-    //TODO initialize source : how, to get a reference to the DatasetSource instance that spark
-    // instantiates to be able to call DatasetSource.initialize(). How to pass in a DatasetCatalog?
+    //TODO initialize source : how, to get a reference to the DatasetStreamingSource instance that spark
+    // instantiates to be able to call DatasetStreamingSource.initialize(). How to pass in a DatasetCatalog?
     MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
       @Override public WindowedValue<T> call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 184d24c..758ff1d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSourceMock;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
@@ -29,8 +28,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.streaming.DataStreamReader;
-
 
 /**
  * Mock translator that generates a source of 0 to 999 and prints it.
@@ -39,7 +36,7 @@ import org.apache.spark.sql.streaming.DataStreamReader;
 class ReadSourceTranslatorMockBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = DatasetSourceMock.class.getCanonicalName();
+  private String SOURCE_PROVIDER_CLASS = DatasetSourceMockBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
similarity index 99%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
index deacdf4..8701a83 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming.translation.io;
+package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static scala.collection.JavaConversions.asScalaBuffer;
@@ -56,7 +56,7 @@ import scala.collection.immutable.Map;
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
  * class is just a mix-in.
  */
-public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
+public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{
 
   private int numPartitions;
   private Long bundleSize;


[beam] 04/04: Add ReadSourceTranslatorStreaming

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 77cacde50180ee07f8838243b6790fc6c4044f95
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 28 10:28:18 2018 +0100

    Add ReadSourceTranslatorStreaming
---
 ...mingSource.java => DatasetSourceStreaming.java} |  2 +-
 .../streaming/ReadSourceTranslatorStreaming.java   | 76 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
similarity index 99%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index 6947b6d..fad68d3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -55,7 +55,7 @@ import scala.collection.immutable.Map;
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
-public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{
+public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport{
 
   private int numPartitions;
   private Long bundleSize;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
new file mode 100644
index 0000000..6066822
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.construction.ReadTranslation;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+class ReadSourceTranslatorStreaming<T>
+    implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
+
+  private String SOURCE_PROVIDER_CLASS = DatasetSourceStreaming.class.getCanonicalName();
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void translateTransform(
+      PTransform<PBegin, PCollection<T>> transform, TranslationContext context) {
+    AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform =
+        (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
+            context.getCurrentTransform();
+
+    UnboundedSource<T, UnboundedSource.CheckpointMark> source;
+    try {
+       source = ReadTranslation
+          .unboundedSourceFromTransform(rootTransform);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    SparkSession sparkSession = context.getSparkSession();
+
+    Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS).load();
+
+    //TODO pass the source and the translation context serialized as string to the DatasetSource
+    MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
+      @Override public WindowedValue call(Row value) throws Exception {
+        //there is only one value put in each Row by the InputPartitionReader
+        return value.<WindowedValue>getAs(0);
+      }
+    };
+    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+    // be created ?
+    Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
+
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+    context.putDatasetRaw(output, dataset);
+  }
+}


[beam] 02/04: Use raw Encoder also in regular ReadSourceTranslatorBatch

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 36a72f72bb0ce30464cd23fc3a5134f97a92a721
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 28 10:16:01 2018 +0100

    Use raw Encoder<WindowedValue> also in regular ReadSourceTranslatorBatch
---
 .../translation/TranslationContext.java            |  1 -
 .../batch/ReadSourceTranslatorBatch.java           | 22 ++++++++++------------
 .../batch/ReadSourceTranslatorMockBatch.java       |  2 ++
 3 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 82aa80b..acc49f4 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -115,7 +115,6 @@ public class TranslationContext {
     }
   }
 
-    //TODO: remove. It is just for testing
     public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) {
       if (!datasets.containsKey(value)) {
         datasets.put(value, dataset);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 370e3f4..d980a52 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetStreamingSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -30,9 +29,9 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.streaming.DataStreamReader;
 
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
@@ -47,7 +46,6 @@ class ReadSourceTranslatorBatch<T>
         (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>)
             context.getCurrentTransform();
 
-        String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$"));
         BoundedSource<T> source;
     try {
       source = ReadTranslation.boundedSourceFromTransform(rootTransform);
@@ -56,20 +54,20 @@ class ReadSourceTranslatorBatch<T>
     }
     SparkSession sparkSession = context.getSparkSession();
 
-    Dataset<Row> rowDataset = sparkSession.read().format(providerClassName).load();
+    Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
 
-    //TODO initialize source : how, to get a reference to the DatasetStreamingSource instance that spark
-    // instantiates to be able to call DatasetStreamingSource.initialize(). How to pass in a DatasetCatalog?
-    MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
-      @Override public WindowedValue<T> call(Row value) throws Exception {
+    //TODO pass the source and the translation context serialized as string to the DatasetSource
+    MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() {
+      @Override public WindowedValue call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue<T>>getAs(0);
+        return value.<WindowedValue>getAs(0);
       }
     };
-    //TODO fix encoder: how to get an Encoder<WindowedValue<T>>
-    Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null);
+    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+    // be created ?
+    Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
-    context.putDataset(output, dataset);
+    context.putDatasetRaw(output, dataset);
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 758ff1d..d7b9175 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -52,6 +52,8 @@ class ReadSourceTranslatorMockBatch<T>
         return value.<WindowedValue>getAs(0);
       }
     };
+    //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>>
+    // be created ?
     Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();