You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/07 18:06:41 UTC
[hudi] 19/21: [HUDI-3168] Fixing null schema with empty commit in incremental relation (#4513)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f39538edca68df2eac2fe9a20b173262ebac17d2
Author: Vinish Reddy <vi...@gmail.com>
AuthorDate: Wed Jan 5 22:13:10 2022 +0530
[HUDI-3168] Fixing null schema with empty commit in incremental relation (#4513)
---
.../org/apache/hudi/IncrementalRelation.scala | 154 +++++++++++----------
.../sources/S3EventsHoodieIncrSource.java | 4 +
2 files changed, 86 insertions(+), 72 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 958a15e..1907108 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,8 +17,9 @@
package org.apache.hudi
-import java.util.stream.Collectors
+import org.apache.avro.Schema
+import java.util.stream.Collectors
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -89,8 +90,13 @@ class IncrementalRelation(val sqlContext: SQLContext,
} else {
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
}
- val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
- StructType(skeletonSchema.fields ++ dataSchema.fields)
+ if (tableSchema.getType == Schema.Type.NULL) {
+ // if there is only one commit in the table and is an empty commit without schema, return empty RDD here
+ StructType(Nil)
+ } else {
+ val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ StructType(skeletonSchema.fields ++ dataSchema.fields)
+ }
}
private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,
@@ -99,86 +105,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
override def schema: StructType = usedSchema
override def buildScan(): RDD[Row] = {
- val regularFileIdToFullPath = mutable.HashMap[String, String]()
- var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
-
- // create Replaced file group
- val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
- val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
- val replaceMetadata = HoodieReplaceCommitMetadata.
- fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
- replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
- entry.getValue.map { e =>
- val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
- (e, fullPath)
+ if (usedSchema == StructType(Nil)) {
+ // if first commit in a table is an empty commit without schema, return empty RDD here
+ sqlContext.sparkContext.emptyRDD[Row]
+ } else {
+ val regularFileIdToFullPath = mutable.HashMap[String, String]()
+ var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
+ // create Replaced file group
+ val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
+ val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
+ val replaceMetadata = HoodieReplaceCommitMetadata.
+ fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
+ replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
+ entry.getValue.map { e =>
+ val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
+ (e, fullPath)
+ }
+ }
+ }.toMap
+
+ for (commit <- commitsToReturn) {
+ val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
+ .get, classOf[HoodieCommitMetadata])
+
+ if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
+ metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
+ replacedFile.contains(k) && v.startsWith(replacedFile(k))
+ }
+ } else {
+ regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
+ replacedFile.contains(k) && v.startsWith(replacedFile(k))
+ }
}
}
- }.toMap
- for (commit <- commitsToReturn) {
- val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
- .get, classOf[HoodieCommitMetadata])
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ // filer out meta bootstrap files that have had more commits since metadata bootstrap
+ metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
+ .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
+ }
- if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
- metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
- replacedFile.contains(k) && v.startsWith(replacedFile(k))
- }
- } else {
- regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
- replacedFile.contains(k) && v.startsWith(replacedFile(k))
+ val pathGlobPattern = optParams.getOrElse(
+ DataSourceReadOptions.INCR_PATH_GLOB.key,
+ DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
+ val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
+ if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
+ val globMatcher = new GlobPattern("*" + pathGlobPattern)
+ (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
+ metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
+ } else {
+ (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
}
}
- }
-
- if (metaBootstrapFileIdToFullPath.nonEmpty) {
- // filer out meta bootstrap files that have had more commits since metadata bootstrap
- metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
- .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
- }
-
- val pathGlobPattern = optParams.getOrElse(
- DataSourceReadOptions.INCR_PATH_GLOB.key,
- DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
- val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
- if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
- val globMatcher = new GlobPattern("*" + pathGlobPattern)
- (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
- metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
+ // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
+ // will filter out all the files incorrectly.
+ sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
+ val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
+ if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
+ sqlContext.sparkContext.emptyRDD[Row]
} else {
- (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
- }
- }
- // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
- // will filter out all the files incorrectly.
- sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
- val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
- if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
- sqlContext.sparkContext.emptyRDD[Row]
- } else {
- log.info("Additional Filters to be applied to incremental source are :" + filters)
+ log.info("Additional Filters to be applied to incremental source are :" + filters)
- var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+ var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
- if (metaBootstrapFileIdToFullPath.nonEmpty) {
- df = sqlContext.sparkSession.read
- .format("hudi")
- .schema(usedSchema)
- .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
- .load()
- }
+ if (metaBootstrapFileIdToFullPath.nonEmpty) {
+ df = sqlContext.sparkSession.read
+ .format("hudi")
+ .schema(usedSchema)
+ .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
+ .load()
+ }
- if (regularFileIdToFullPath.nonEmpty)
- {
- df = df.union(sqlContext.read.options(sOpts)
- .schema(usedSchema)
- .parquet(filteredRegularFullPaths.toList: _*)
- .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- commitsToReturn.head.getTimestamp))
- .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- commitsToReturn.last.getTimestamp)))
- }
+ if (regularFileIdToFullPath.nonEmpty) {
+ df = df.union(sqlContext.read.options(sOpts)
+ .schema(usedSchema)
+ .parquet(filteredRegularFullPaths.toList: _*)
+ .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.head.getTimestamp))
+ .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+ commitsToReturn.last.getTimestamp)))
+ }
- filters.foldLeft(df)((e, f) => e.filter(f)).rdd
+ filters.foldLeft(df)((e, f) => e.filter(f)).rdd
+ }
}
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index ec789ab..f67fbcc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -105,6 +105,10 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
Dataset<Row> source = metaReader.load(srcPath);
+
+ if (source.isEmpty()) {
+ return Pair.of(Option.empty(), instantEndpts.getRight());
+ }
String filter = "s3.object.size > 0";
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {