You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/07 18:06:41 UTC

[hudi] 19/21: [HUDI-3168] Fixing null schema with empty commit in incremental relation (#4513)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f39538edca68df2eac2fe9a20b173262ebac17d2
Author: Vinish Reddy <vi...@gmail.com>
AuthorDate: Wed Jan 5 22:13:10 2022 +0530

    [HUDI-3168] Fixing null schema with empty commit in incremental relation (#4513)
---
 .../org/apache/hudi/IncrementalRelation.scala      | 154 +++++++++++----------
 .../sources/S3EventsHoodieIncrSource.java          |   4 +
 2 files changed, 86 insertions(+), 72 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 958a15e..1907108 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,8 +17,9 @@
 
 package org.apache.hudi
 
-import java.util.stream.Collectors
+import org.apache.avro.Schema
 
+import java.util.stream.Collectors
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -89,8 +90,13 @@ class IncrementalRelation(val sqlContext: SQLContext,
     } else {
       schemaResolver.getTableAvroSchemaWithoutMetadataFields()
     }
-    val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
-    StructType(skeletonSchema.fields ++ dataSchema.fields)
+    if (tableSchema.getType == Schema.Type.NULL) {
+      // if there is only one commit in the table and is an empty commit without schema, return empty RDD here
+      StructType(Nil)
+    } else {
+      val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+      StructType(skeletonSchema.fields ++ dataSchema.fields)
+    }
   }
 
   private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,
@@ -99,86 +105,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
   override def schema: StructType = usedSchema
 
   override def buildScan(): RDD[Row] = {
-    val regularFileIdToFullPath = mutable.HashMap[String, String]()
-    var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
-
-    // create Replaced file group
-    val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
-    val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
-      val replaceMetadata = HoodieReplaceCommitMetadata.
-        fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
-      replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
-        entry.getValue.map { e =>
-          val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
-          (e, fullPath)
+    if (usedSchema == StructType(Nil)) {
+      // if first commit in a table is an empty commit without schema, return empty RDD here
+      sqlContext.sparkContext.emptyRDD[Row]
+    } else {
+      val regularFileIdToFullPath = mutable.HashMap[String, String]()
+      var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
+      // create Replaced file group
+      val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
+      val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
+        val replaceMetadata = HoodieReplaceCommitMetadata.
+          fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
+        replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
+          entry.getValue.map { e =>
+            val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
+            (e, fullPath)
+          }
+        }
+      }.toMap
+
+      for (commit <- commitsToReturn) {
+        val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
+          .get, classOf[HoodieCommitMetadata])
+
+        if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
+          metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
+            replacedFile.contains(k) && v.startsWith(replacedFile(k))
+          }
+        } else {
+          regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
+            replacedFile.contains(k) && v.startsWith(replacedFile(k))
+          }
         }
       }
-    }.toMap
 
-    for (commit <- commitsToReturn) {
-      val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
-        .get, classOf[HoodieCommitMetadata])
+      if (metaBootstrapFileIdToFullPath.nonEmpty) {
+        // filer out meta bootstrap files that have had more commits since metadata bootstrap
+        metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
+          .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
+      }
 
-      if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
-        metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
-          replacedFile.contains(k) && v.startsWith(replacedFile(k))
-        }
-      } else {
-        regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
-          replacedFile.contains(k) && v.startsWith(replacedFile(k))
+      val pathGlobPattern = optParams.getOrElse(
+        DataSourceReadOptions.INCR_PATH_GLOB.key,
+        DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
+      val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
+        if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
+          val globMatcher = new GlobPattern("*" + pathGlobPattern)
+          (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
+            metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
+        } else {
+          (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
         }
       }
-    }
-
-    if (metaBootstrapFileIdToFullPath.nonEmpty) {
-      // filer out meta bootstrap files that have had more commits since metadata bootstrap
-      metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
-        .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
-    }
-
-    val pathGlobPattern = optParams.getOrElse(
-      DataSourceReadOptions.INCR_PATH_GLOB.key,
-      DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
-    val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
-      if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
-        val globMatcher = new GlobPattern("*" + pathGlobPattern)
-        (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
-          metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
+      // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
+      // will filter out all the files incorrectly.
+      sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
+      val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
+      if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
+        sqlContext.sparkContext.emptyRDD[Row]
       } else {
-        (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
-      }
-    }
-    // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
-    // will filter out all the files incorrectly.
-    sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
-    val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
-    if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
-      sqlContext.sparkContext.emptyRDD[Row]
-    } else {
-      log.info("Additional Filters to be applied to incremental source are :" + filters)
+        log.info("Additional Filters to be applied to incremental source are :" + filters)
 
-      var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+        var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-      if (metaBootstrapFileIdToFullPath.nonEmpty) {
-        df = sqlContext.sparkSession.read
-               .format("hudi")
-               .schema(usedSchema)
-               .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
-               .load()
-      }
+        if (metaBootstrapFileIdToFullPath.nonEmpty) {
+          df = sqlContext.sparkSession.read
+            .format("hudi")
+            .schema(usedSchema)
+            .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
+            .load()
+        }
 
-      if (regularFileIdToFullPath.nonEmpty)
-      {
-        df = df.union(sqlContext.read.options(sOpts)
-                        .schema(usedSchema)
-                        .parquet(filteredRegularFullPaths.toList: _*)
-                        .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.head.getTimestamp))
-                        .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.last.getTimestamp)))
-      }
+        if (regularFileIdToFullPath.nonEmpty) {
+          df = df.union(sqlContext.read.options(sOpts)
+            .schema(usedSchema)
+            .parquet(filteredRegularFullPaths.toList: _*)
+            .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+              commitsToReturn.head.getTimestamp))
+            .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+              commitsToReturn.last.getTimestamp)))
+        }
 
-      filters.foldLeft(df)((e, f) => e.filter(f)).rdd
+        filters.foldLeft(df)((e, f) => e.filter(f)).rdd
+      }
     }
   }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index ec789ab..f67fbcc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -105,6 +105,10 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
         .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
         .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
     Dataset<Row> source = metaReader.load(srcPath);
+    
+    if (source.isEmpty()) {
+      return Pair.of(Option.empty(), instantEndpts.getRight());
+    }
 
     String filter = "s3.object.size > 0";
     if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {