You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/20 09:12:11 UTC

[beam] branch spark-runner_structured-streaming updated: Experiment over using spark Catalog to pass in Beam Source through spark Table

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 44fd6c7  Experiment over using spark Catalog to pass in Beam Source through spark Table
44fd6c7 is described below

commit 44fd6c7d8ab9f3fce32ae2c48bc596a0e7837bab
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Dec 19 17:08:58 2018 +0100

    Experiment over using spark Catalog to pass in Beam Source through spark Table
---
 .../batch/ReadSourceTranslatorBatch.java           |  12 +-
 .../translation/io/DatasetSource.java              | 191 ++++++++++++++++++++-
 2 files changed, 193 insertions(+), 10 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 2c1aa93..0b828fb 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -30,11 +30,16 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
 import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Catalog;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
 import org.apache.spark.sql.streaming.DataStreamReader;
 
 class ReadSourceTranslatorBatch<T>
@@ -58,9 +63,12 @@ class ReadSourceTranslatorBatch<T>
       throw new RuntimeException(e);
     }
     SparkSession sparkSession = context.getSparkSession();
-    Dataset<Row> rowDataset = sparkSession.readStream().format(providerClassName).load();
+
+    DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
+    Dataset<Row> rowDataset = dataStreamReader.load();
+
     //TODO initialize source : how, to get a reference to the DatasetSource instance that spark
-    // instantiates to be able to call DatasetSource.initialize()
+    // instantiates to be able to call DatasetSource.initialize(). How to pass in a DatasetCatalog?
     MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
       @Override public WindowedValue<T> call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index d23ecf3..deacdf4 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -28,7 +28,16 @@ import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalog.Catalog;
+import org.apache.spark.sql.catalog.Column;
+import org.apache.spark.sql.catalog.Database;
+import org.apache.spark.sql.catalog.Function;
+import org.apache.spark.sql.catalog.Table;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -39,6 +48,8 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.storage.StorageLevel;
+import scala.collection.immutable.Map;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
@@ -53,17 +64,12 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
   private BoundedSource<T> source;
 
 
-  public void initialize(TranslationContext context, BoundedSource<T> source){
-    this.context = context;
-    this.source = source;
-    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
-    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
-    this.bundleSize = context.getOptions().getBundleSize();
-  }
-
   @Override
   public MicroBatchReader createMicroBatchReader(
       Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+    this.bundleSize = context.getOptions().getBundleSize();
     return new DatasetMicroBatchReader(schema, checkpointLocation, options);
   }
 
@@ -190,4 +196,173 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
       reader.close();
     }
   }
+
+  private static class DatasetCatalog<T> extends Catalog {
+
+    TranslationContext context;
+    Source<T> source;
+
+    private DatasetCatalog(TranslationContext context, Source<T> source) {
+      this.context = context;
+      this.source = source;
+    }
+
+    @Override public String currentDatabase() {
+      return null;
+    }
+
+    @Override public void setCurrentDatabase(String dbName) {
+
+    }
+
+    @Override public Dataset<Database> listDatabases() {
+      return null;
+    }
+
+    @Override public Dataset<Table> listTables() {
+      return null;
+    }
+
+    @Override public Dataset<Table> listTables(String dbName) throws AnalysisException {
+      return null;
+    }
+
+    @Override public Dataset<Function> listFunctions() {
+      return null;
+    }
+
+    @Override public Dataset<Function> listFunctions(String dbName) throws AnalysisException {
+      return null;
+    }
+
+    @Override public Dataset<Column> listColumns(String tableName) throws AnalysisException {
+      return null;
+    }
+
+    @Override public Dataset<Column> listColumns(String dbName, String tableName)
+        throws AnalysisException {
+      return null;
+    }
+
+    @Override public Database getDatabase(String dbName) throws AnalysisException {
+      return null;
+    }
+
+    @Override public Table getTable(String tableName) throws AnalysisException {
+      return new DatasetTable<>("beam", "beaam", "beam fake table to wire up with Beam sources",
+          null, true, source, context);
+    }
+
+    @Override public Table getTable(String dbName, String tableName) throws AnalysisException {
+      return null;
+    }
+
+    @Override public Function getFunction(String functionName) throws AnalysisException {
+      return null;
+    }
+
+    @Override public Function getFunction(String dbName, String functionName)
+        throws AnalysisException {
+      return null;
+    }
+
+    @Override public boolean databaseExists(String dbName) {
+      return false;
+    }
+
+    @Override public boolean tableExists(String tableName) {
+      return false;
+    }
+
+    @Override public boolean tableExists(String dbName, String tableName) {
+      return false;
+    }
+
+    @Override public boolean functionExists(String functionName) {
+      return false;
+    }
+
+    @Override public boolean functionExists(String dbName, String functionName) {
+      return false;
+    }
+
+    @Override public Dataset<Row> createTable(String tableName, String path) {
+      return null;
+    }
+
+    @Override public Dataset<Row> createTable(String tableName, String path, String source) {
+      return null;
+    }
+
+    @Override public Dataset<Row> createTable(String tableName, String source,
+        Map<String, String> options) {
+      return null;
+    }
+
+    @Override public Dataset<Row> createTable(String tableName, String source, StructType schema,
+        Map<String, String> options) {
+      return null;
+    }
+
+    @Override public boolean dropTempView(String viewName) {
+      return false;
+    }
+
+    @Override public boolean dropGlobalTempView(String viewName) {
+      return false;
+    }
+
+    @Override public void recoverPartitions(String tableName) {
+
+    }
+
+    @Override public boolean isCached(String tableName) {
+      return false;
+    }
+
+    @Override public void cacheTable(String tableName) {
+
+    }
+
+    @Override public void cacheTable(String tableName, StorageLevel storageLevel) {
+
+    }
+
+    @Override public void uncacheTable(String tableName) {
+
+    }
+
+    @Override public void clearCache() {
+
+    }
+
+    @Override public void refreshTable(String tableName) {
+
+    }
+
+    @Override public void refreshByPath(String path) {
+
+    }
+
+    private static class DatasetTable<T> extends Table {
+
+      private Source<T> source;
+      private TranslationContext context;
+
+      public DatasetTable(String name, String database, String description, String tableType,
+          boolean isTemporary, Source<T> source, TranslationContext context) {
+        super(name, database, description, tableType, isTemporary);
+        this.source = source;
+        this.context = context;
+      }
+
+      private Source<T> getSource() {
+        return source;
+      }
+
+      private TranslationContext getContext() {
+        return context;
+      }
+    }
+  }
 }