You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/10/10 12:53:53 UTC

[hudi] branch master updated: [HUDI-1301] use spark INCREMENTAL mode query hudi dataset support schema version. (#2125)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 585ce00  [HUDI-1301]  use  spark INCREMENTAL mode query hudi dataset support schema version. (#2125)
585ce00 is described below

commit 585ce0094d6527bab988f7657b4e84d12274ee28
Author: lw0090 <lw...@gmail.com>
AuthorDate: Sat Oct 10 20:53:41 2020 +0800

    [HUDI-1301]  use  spark INCREMENTAL mode query hudi dataset support schema version. (#2125)
---
 .../hudi/common/table/TableSchemaResolver.java     | 29 ++++++++++++++++++++--
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  9 +++++++
 .../org/apache/hudi/IncrementalRelation.scala      | 23 +++++++++++------
 3 files changed, 51 insertions(+), 10 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index db68667..372b393 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -175,20 +175,45 @@ public class TableSchemaResolver {
    * @throws Exception
    */
   public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
-    Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false);
+    HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), false);
     return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
            HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
   }
 
   /**
+   * Gets users data schema for a hoodie table in Avro format of the instant.
+   *
+   * @param instant will get the instant data schema
+   * @return  Avro user data schema
+   * @throws Exception
+   */
+  public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) throws Exception {
+    Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(instant, false);
+    return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
+        HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+  }
+
+  /**
    * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit.
    *
    * @return Avro schema for this table
    */
   private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
+    HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    return getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), includeMetadataFields);
+  }
+
+
+  /**
+   * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the instant.
+   *
+   * @return Avro schema for this table
+   */
+  private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) {
     try {
       HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-      byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get();
+      byte[] data = timeline.getInstantDetails(instant).get();
       HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
       String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
 
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index e153e92..d66fa96 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -109,6 +109,15 @@ object DataSourceReadOptions {
   val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"
 
   /**
+    * If use the end instant schema when incrementally fetched data to.
+    *
+    * Default: false (use latest instant schema)
+    *
+    */
+  val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.schema.use.end.instanttime"
+  val DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL = "false"
+
+  /**
     * For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions,
     * filters appearing late in the sequence of transformations cannot be automatically pushed down.
     * This option allows setting filters directly on Hoodie Source
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index e113d4a..ff68ef0 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -55,7 +55,6 @@ class IncrementalRelation(val sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[IncrementalRelation])
 
-
   val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
   private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
 
@@ -76,6 +75,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
       s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
   }
 
+  val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY,
+    DataSourceReadOptions.DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL).toBoolean
+
   private val lastInstant = commitTimeline.lastInstant().get()
 
   private val commitsToReturn = commitTimeline.findInstantsInRange(
@@ -83,11 +85,16 @@ class IncrementalRelation(val sqlContext: SQLContext,
     optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
     .getInstants.iterator().toList
 
-  // use schema from a file produced in the latest instant
-  val latestSchema: StructType = {
+  // use schema from a file produced in the end/latest instant
+  val usedSchema: StructType = {
     log.info("Inferring schema..")
     val schemaResolver = new TableSchemaResolver(metaClient)
-    val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+    val tableSchema = if (useEndInstantSchema) {
+      if (commitsToReturn.isEmpty)  schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
+        schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last)
+    } else {
+      schemaResolver.getTableAvroSchemaWithoutMetadataFields()
+    }
     val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
     StructType(skeletonSchema.fields ++ dataSchema.fields)
   }
@@ -104,7 +111,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
     }
   }
 
-  override def schema: StructType = latestSchema
+  override def schema: StructType = usedSchema
 
   override def buildScan(): RDD[Row] = {
     val regularFileIdToFullPath = mutable.HashMap[String, String]()
@@ -148,12 +155,12 @@ class IncrementalRelation(val sqlContext: SQLContext,
     } else {
       log.info("Additional Filters to be applied to incremental source are :" + filters)
 
-      var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)
+      var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
       if (metaBootstrapFileIdToFullPath.nonEmpty) {
         df = sqlContext.sparkSession.read
                .format("hudi")
-               .schema(latestSchema)
+               .schema(usedSchema)
                .option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(","))
                .load()
       }
@@ -161,7 +168,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
       if (regularFileIdToFullPath.nonEmpty)
       {
         df = df.union(sqlContext.read.options(sOpts)
-                        .schema(latestSchema)
+                        .schema(usedSchema)
                         .parquet(filteredRegularFullPaths.toList: _*)
                         .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
                           commitsToReturn.head.getTimestamp))