You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/12/20 09:12:11 UTC
[beam] branch spark-runner_structured-streaming updated: Experiment
over using spark Catalog to pass in Beam Source through spark Table
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new 44fd6c7 Experiment over using spark Catalog to pass in Beam Source through spark Table
44fd6c7 is described below
commit 44fd6c7d8ab9f3fce32ae2c48bc596a0e7837bab
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Dec 19 17:08:58 2018 +0100
Experiment over using spark Catalog to pass in Beam Source through spark Table
---
.../batch/ReadSourceTranslatorBatch.java | 12 +-
.../translation/io/DatasetSource.java | 191 ++++++++++++++++++++-
2 files changed, 193 insertions(+), 10 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 2c1aa93..0b828fb 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -30,11 +30,16 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Catalog;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.streaming.DataStreamReader;
class ReadSourceTranslatorBatch<T>
@@ -58,9 +63,12 @@ class ReadSourceTranslatorBatch<T>
throw new RuntimeException(e);
}
SparkSession sparkSession = context.getSparkSession();
- Dataset<Row> rowDataset = sparkSession.readStream().format(providerClassName).load();
+
+ DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
+ Dataset<Row> rowDataset = dataStreamReader.load();
+
//TODO initialize source : how, to get a reference to the DatasetSource instance that spark
- // instantiates to be able to call DatasetSource.initialize()
+ // instantiates to be able to call DatasetSource.initialize(). How to pass in a DatasetCatalog?
MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() {
@Override public WindowedValue<T> call(Row value) throws Exception {
//there is only one value put in each Row by the InputPartitionReader
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index d23ecf3..deacdf4 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -28,7 +28,16 @@ import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalog.Catalog;
+import org.apache.spark.sql.catalog.Column;
+import org.apache.spark.sql.catalog.Database;
+import org.apache.spark.sql.catalog.Function;
+import org.apache.spark.sql.catalog.Table;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -39,6 +48,8 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.storage.StorageLevel;
+import scala.collection.immutable.Map;
/**
* This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
@@ -53,17 +64,12 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
private BoundedSource<T> source;
- public void initialize(TranslationContext context, BoundedSource<T> source){
- this.context = context;
- this.source = source;
- this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
- checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
- this.bundleSize = context.getOptions().getBundleSize();
- }
-
@Override
public MicroBatchReader createMicroBatchReader(
Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+ this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+ checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+ this.bundleSize = context.getOptions().getBundleSize();
return new DatasetMicroBatchReader(schema, checkpointLocation, options);
}
@@ -190,4 +196,173 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
reader.close();
}
}
+
+ private static class DatasetCatalog<T> extends Catalog {
+
+ TranslationContext context;
+ Source<T> source;
+
+ private DatasetCatalog(TranslationContext context, Source<T> source) {
+ this.context = context;
+ this.source = source;
+ }
+
+ @Override public String currentDatabase() {
+ return null;
+ }
+
+ @Override public void setCurrentDatabase(String dbName) {
+
+ }
+
+ @Override public Dataset<Database> listDatabases() {
+ return null;
+ }
+
+ @Override public Dataset<Table> listTables() {
+ return null;
+ }
+
+ @Override public Dataset<Table> listTables(String dbName) throws AnalysisException {
+ return null;
+ }
+
+ @Override public Dataset<Function> listFunctions() {
+ return null;
+ }
+
+ @Override public Dataset<Function> listFunctions(String dbName) throws AnalysisException {
+ return null;
+ }
+
+ @Override public Dataset<Column> listColumns(String tableName) throws AnalysisException {
+ return null;
+ }
+
+ @Override public Dataset<Column> listColumns(String dbName, String tableName)
+ throws AnalysisException {
+ return null;
+ }
+
+ @Override public Database getDatabase(String dbName) throws AnalysisException {
+ return null;
+ }
+
+ @Override public Table getTable(String tableName) throws AnalysisException {
+ return new DatasetTable<>("beam", "beaam", "beam fake table to wire up with Beam sources",
+ null, true, source, context);
+ }
+
+ @Override public Table getTable(String dbName, String tableName) throws AnalysisException {
+ return null;
+ }
+
+ @Override public Function getFunction(String functionName) throws AnalysisException {
+ return null;
+ }
+
+ @Override public Function getFunction(String dbName, String functionName)
+ throws AnalysisException {
+ return null;
+ }
+
+ @Override public boolean databaseExists(String dbName) {
+ return false;
+ }
+
+ @Override public boolean tableExists(String tableName) {
+ return false;
+ }
+
+ @Override public boolean tableExists(String dbName, String tableName) {
+ return false;
+ }
+
+ @Override public boolean functionExists(String functionName) {
+ return false;
+ }
+
+ @Override public boolean functionExists(String dbName, String functionName) {
+ return false;
+ }
+
+ @Override public Dataset<Row> createTable(String tableName, String path) {
+ return null;
+ }
+
+ @Override public Dataset<Row> createTable(String tableName, String path, String source) {
+ return null;
+ }
+
+ @Override public Dataset<Row> createTable(String tableName, String source,
+ Map<String, String> options) {
+ return null;
+ }
+
+ @Override public Dataset<Row> createTable(String tableName, String source, StructType schema,
+ Map<String, String> options) {
+ return null;
+ }
+
+ @Override public boolean dropTempView(String viewName) {
+ return false;
+ }
+
+ @Override public boolean dropGlobalTempView(String viewName) {
+ return false;
+ }
+
+ @Override public void recoverPartitions(String tableName) {
+
+ }
+
+ @Override public boolean isCached(String tableName) {
+ return false;
+ }
+
+ @Override public void cacheTable(String tableName) {
+
+ }
+
+ @Override public void cacheTable(String tableName, StorageLevel storageLevel) {
+
+ }
+
+ @Override public void uncacheTable(String tableName) {
+
+ }
+
+ @Override public void clearCache() {
+
+ }
+
+ @Override public void refreshTable(String tableName) {
+
+ }
+
+ @Override public void refreshByPath(String path) {
+
+ }
+
+ private static class DatasetTable<T> extends Table {
+
+ private Source<T> source;
+ private TranslationContext context;
+
+ public DatasetTable(String name, String database, String description, String tableType,
+ boolean isTemporary, Source<T> source, TranslationContext context) {
+ super(name, database, description, tableType, isTemporary);
+ this.source = source;
+ this.context = context;
+ }
+
+ private Source<T> getSource() {
+ return source;
+ }
+
+ private TranslationContext getContext() {
+ return context;
+ }
+ }
+ }
}