You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2022/02/21 12:15:31 UTC

[hudi] branch master updated: [HUDI-2732][RFC-38] Spark Datasource V2 Integration (#3964)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 76b6ad6  [HUDI-2732][RFC-38] Spark Datasource V2 Integration (#3964)
76b6ad6 is described below

commit 76b6ad649135ec0ed425c3c173d91f1e0ce67c5d
Author: leesf <49...@qq.com>
AuthorDate: Mon Feb 21 20:14:07 2022 +0800

    [HUDI-2732][RFC-38] Spark Datasource V2 Integration (#3964)
---
 rfc/README.md        |   2 +-
 rfc/rfc-38/1.png     | Bin 0 -> 207639 bytes
 rfc/rfc-38/rfc-38.md | 283 +++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 284 insertions(+), 1 deletion(-)

diff --git a/rfc/README.md b/rfc/README.md
index ca9e878..a9587d1 100644
--- a/rfc/README.md
+++ b/rfc/README.md
@@ -61,7 +61,7 @@ The list of all RFCs can be found here.
 | 35 | [Make Flink MOR table writing streaming friendly](https://cwiki.apache.org/confluence/display/HUDI/RFC-35%3A+Make+Flink+MOR+table+writing+streaming+friendly) | `UNDER REVIEW` |
 | 36 | [HUDI Metastore Server](https://cwiki.apache.org/confluence/display/HUDI/%5BWIP%5D+RFC-36%3A+HUDI+Metastore+Server) | `UNDER REVIEW` |
 | 37 | [Hudi Metadata based Bloom Index](rfc-37/rfc-37.md) | `IN PROGRESS` |
-| 38 | [Spark Datasource V2 Integration] | `UNDER REVIEW` |
+| 38 | [Spark Datasource V2 Integration](./rfc-38/rfc-38.md) | `IN PROGRESS` |
 | 39 | [Incremental source for Debezium](./rfc-39/rfc-39.md) | `IN PROGRESS` |
 | 40 | [Hudi Connector for Trino](./rfc-40/rfc-40.md) | `IN PROGRESS` |
 | 41 | [Hudi Snowflake Integration] | `UNDER REVIEW` |
diff --git a/rfc/rfc-38/1.png b/rfc/rfc-38/1.png
new file mode 100644
index 0000000..4423888
Binary files /dev/null and b/rfc/rfc-38/1.png differ
diff --git a/rfc/rfc-38/rfc-38.md b/rfc/rfc-38/rfc-38.md
new file mode 100644
index 0000000..80cb5d7
--- /dev/null
+++ b/rfc/rfc-38/rfc-38.md
@@ -0,0 +1,283 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# RFC-38: Spark Datasource V2 Integration
+
+## Proposers
+
+- @leesf
+
+## Approvers
+- @vinothchandar
+- @xishiyan
+- @YannByron
+
+## Status
+
+JIRA: https://issues.apache.org/jira/browse/HUDI-1297
+
+## Abstract
+
+Today, Hudi still uses V1 api and relies heavily on RDD api to index, repartition and so on given the flexibility of RDD api, 
+it works fine in v1 api, using datasource V1 api, Hudi provides complete read/write, update, 
+and small file auto handling capabilities, all things work well. 
+However, with the continuous development and evolving of datasource V2 api, 
+the datasource v2 api has stabilized.Taking into account the datasource v1 api is too old and the spark community 
+no longer spends more resources to maintain v1 api, so consider migrating to DataSource V2 api, 
+and use more pushdown filters provided by V2 api and 
+integrate with [RFC-27](https://cwiki.apache.org/confluence/display/HUDI/RFC-27+Data+skipping+index+to+improve+query+performance) 
+to provide more powerful query capabilities. Also we could leverage it after V2 api get evolved or optimized again.
+
+
+## Background
+
+The current Hudi read and write paths use DataSource V1 api, and the implementation class is `DefaultSource`
+
+```scala
+/**
+* Hoodie Spark Datasource, for reading and writing hoodie tables
+*
+*/
+class DefaultSource extends RelationProvider
+with SchemaRelationProvider
+with CreatableRelationProvider
+with DataSourceRegister
+with StreamSinkProvider
+with StreamSourceProvider
+with Serializable {
+...
+}
+```
+
+As for writing(batch write), the following method will be called.
+```scala
+override def createRelation(sqlContext: SQLContext,
+mode: SaveMode,
+optParams: Map[String, String],
+df: DataFrame): BaseRelation = {
+val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
+val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters)
+val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
+
+    if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
+      HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
+    } else {
+      HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols)
+    }
+    new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
+}
+```
+
+Regarding querying, the following method will return a `BaseRelation`(if not provide schema)
+
+```scala
+override def createRelation(sqlContext: SQLContext,
+parameters: Map[String, String]): BaseRelation = {
+createRelation(sqlContext, parameters, null)
+}
+```
+
+For streaming writing and reading, DefaultSource#createSink and DefaultSource#createSink are called respectively.
+In 0.9.0 version , the bulk_insert row mode was introduced to speed up bulk_insert, which implements the `SupportsWrite` v2 api and uses `HoodieDataSourceInternalTable` for writing, 
+right now only bulk_insert operation is supported.
+
+## Implementation
+
+Spark provides a complete V2 api, such as `CatalogPlugin`, `SupportsWrite`, `SupportsRead`, and various pushdown filters,  
+such as `SupportsPushDownFilters`, `SupportsPushDownAggregates`, `SupportsPushDownRequiredColumns`
+
+We would define the key abstraction of call `HoodieInternalV2Table`, which inherits the `Table`, `SupportsWrite`, `SupportsRead` 
+interfaces to provide writing and reading capabilities.
+
+### Writing Path
+
+Hudi relies heavily on some RDD APIs on write path, such as the indexing to determine where the record is update or insert, 
+this refactoring work is relatively large or impossible to migrate to v2 write path under datasource v2 api. 
+So we can fallback to write to v1 since Spark provides the `V1Write` interface to bridge the V1 and V2 api in 3.2.0
+
+The writing path code snippet is below
+
+```scala
+class HoodieInternalV2Table extends Table with SupportsWrite with V2TableWithV1Fallback {
+
+  override def name(): String = {
+    //
+  }
+
+  override def schema(): StructType = {
+    // get hudi table schema
+  }
+
+  override def partitioning(): Array[Transform] = {
+    // get partitioning of hudi table.
+  }
+
+  override def capabilities(): Set[TableCapability] = {
+    // Set(BATCH_WRITE, BATCH_READ,TRUNCATE,...)
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    // HoodieV1WriteBuilder
+  }
+}
+```
+
+The definition of `HoodieV1WriteBuilder` shows below.
+
+```scala
+private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
+                                     hoodieCatalogTable: HoodieCatalogTable,
+                                     spark: SparkSession)
+  extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {
+
+  override def truncate(): HoodieV1WriteBuilder = {
+    this
+  }
+
+  override def overwrite(filters: Array[Filter]): WriteBuilder = {
+    this
+  }
+
+  override def build(): V1Write = new V1Write {
+    override def toInsertableRelation: InsertableRelation = {
+      //IntertableRelation
+    }
+  }
+}
+```
+
+### Querying path
+
+For v2 querying, Spark provides various pushdown filters, such as `SupportsPushDownFilters`, `SupportsPushDownAggregates`, 
+`SupportsPushDownRequiredColumns`, `SupportsRuntimeFiltering` and so on, which is more clear and flexible than v1 interface.
+Also, v2 interface provides the capability to read the columnar format file such as parquet and orc format file, one more thing 
+is that v2 interface provides the capability to split and define the number of partitions for users, which provides the possibility
+to split more accurate splits and accelerate query speed on Hudi side.
+However, for querying, in first stage we also fallback to v1 read path, which means we need convert 
+`DataSourceV2Relation` to `DefaultSource` in analysis stage to make the changes well controlled.
+The code snippet shows below, the `HoodieSpark3Analysis` should be injected if spark version is equal or larger than 3.2.0.
+
+```scala
+
+case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
+  with SparkAdapterSupport with ProvidesHoodieConfig {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
+    case dsv2@DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
+      val output = dsv2.output
+      val catalogTable = if (d.catalogTable.isDefined) {
+        Some(d.v1Table)
+      } else {
+        None
+      }
+      val relation = new DefaultSource().createRelation(new SQLContext(sparkSession),
+        buildHoodieConfig(d.hoodieCatalogTable))
+      LogicalRelation(relation, output, catalogTable, isStreaming = false)
+  }
+}
+
+```
+In the second stage, we would make use of v2 reading interface and define `HoodieBatchScanBuilder` to provide querying 
+capability. The workflow of querying process is shown in below figure.  
+`PartitionReaderFactory` located in the Driver and the `PartitionReader` located in the Executor.
+
+![](./1.png)
+
+The querying path code sample is below
+
+```scala
+class HoodieBatchScanBuilder extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns {
+override def build(): Scan = {
+// HoodieScan
+}
+
+override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+// record the filters
+}
+
+override def pushedFilters(): Array[Filter] = {
+// pushed filters
+}
+
+override def pruneColumns(requiredSchema: StructType): Unit = {
+// record the pruned columns
+}
+}
+```
+
+### Table Meta Management
+
+Implementing the `CatalogPlugin` interface to manage the metadata of the Hudi table and 
+define the core abstraction called `HoodieCatalog`, 
+and the code sample is below.
+
+```scala
+class HoodieCatalog extends DelegatingCatalogExtension
+with StagingTableCatalog {
+  override def loadTable(ident: Identifier): Table = {
+    // HoodieDatasouceTable
+  }
+
+  override def createTable(ident: Identifier, 
+                           schema: StructType, 
+                           partitions: Array[Transform], 
+                           properties: util.Map[String, String]): Table = {
+      // create hudi table
+  }
+    
+  override def dropTable(Identifier ident): Boolean = {
+    // drop hudi table
+  }
+    
+  override def alterTable(Identifier ident, TableChange... changes): Table = {
+    // check schema compability
+    // HoodieDatasouceTable
+  }
+    
+  override def stageReplace(ident: Identifier, 
+                            schema: StructType, 
+                            partitions: Array[Transform], 
+                            properties: util.Map[String, String]): StagedTable = {
+    // StagedHoodieTable
+  }
+    
+  override def stageCreateOrReplace(ident: Identifier, 
+                                    schema: StructType, 
+                                    partitions: Array[Transform], 
+                                    properties: util.Map[String, String]): StagedTable = {
+    // StagedHoodieTable
+  }
+} 
+```
+
+Users would set the spark session config spark.sql.catalog.spark_catalog to org.apache.hudi.catalog.HoodieCatalog to load the HoodieCatalogto manage hudi tables.
+
+## Rollout/Adoption Plan
+
+- What impact (if any) will there be on existing users?
+
+there is no impact on existing users, but users would specify the new catalog to manager hudi tables or other tables.
+
+- If we are changing behavior how will we phase out the older behavior?
+
+we should keep compatibility of v1 version and make it transparent for users to migrate to v2 api.
+
+## Test Plan
+
+[ ] PoC for catalog plugin
+[ ] PoC for writing path with UTs
+[ ] Poc for querying path with UTs
+[ ] E2E tests
+[ ] Benchmark for v1 and v2 writing and querying
\ No newline at end of file