You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/11 18:16:13 UTC

[GitHub] [hudi] yihua commented on a diff in pull request #7528: [HUDI-5443] Fixing exception trying to read MOR table after `NestedSchemaPruning` rule has been applied

yihua commented on code in PR #7528:
URL: https://github.com/apache/hudi/pull/7528#discussion_r1067307358


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -138,10 +137,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
   override protected def getPartitions: Array[Partition] =
     fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
 
-  private def getConfig: Configuration = {
-    val conf = confBroadcast.value.value
-    CONFIG_INSTANTIATION_LOCK.synchronized {
-      new Configuration(conf)
-    }
+  private def getHadoopConf: Configuration = {
+    val conf = hadoopConfBroadcast.value.value
+    new Configuration(conf)

Review Comment:
   still need the lock for concurrency?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils {
    * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
    * B is a subset of A
    */
-  def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
-    val attrs = from.toAttributes
-    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
-    val targetExprs = to.fields.map(f => attrsMap(f.name))
+  def generateUnsafeProjection(sourceStructType: StructType, targetStructType: StructType): UnsafeProjection = {
+    val resolver = SQLConf.get.resolver
+    val attrs = sourceStructType.toAttributes
+    val targetExprs = targetStructType.fields.map { targetField =>
+      val attrRef = attrs.find(attr => resolver(attr.name, targetField.name))
+        .getOrElse(throw new AnalysisException(s"Wasn't able to match target field `${targetField.name}` to any of the source attributes ($attrs)"))
+
+      genProjectingExpression(attrRef, targetField.dataType)
+    }
 
     GenerateUnsafeProjection.generate(targetExprs, attrs)
   }
 
+  private def genProjectingExpression(sourceExpr: Expression,
+                                      targetDataType: DataType): Expression = {
+    checkState(sourceExpr.resolved)
+
+    // TODO support array, map
+    (sourceExpr.dataType, targetDataType) match {
+      case (sdt, tdt) if sdt == tdt =>
+        sourceExpr
+
+      case (sourceType: StructType, targetType: StructType) =>
+        val fieldValueExprs = targetType.fields.map { tf =>

Review Comment:
   Looks like a subset of nested fields may be taken during the projection, e.g., if the source has `a {a.b, a.c, a.d}` and the target has `a.b`, we only keep `a.b` instead of the whole `StructType a`.   Does this happen or the caller of this function always makes sure the `targetStructType` is properly constructed to preserve the root-level field instead of a subset of nested fields?  Is this a problem for projection, where the parquet and log reader can read files with a schema containing a subset of nested fields?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -67,6 +70,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   override lazy val mandatoryFields: Seq[String] = Seq.empty
 
+  override def updatePrunedDataSchema(prunedSchema: StructType): Relation =

Review Comment:
   Does copying this class incur any noticeable overhead?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -106,18 +112,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   }
 
   protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
-    val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters)
-    val fileSplits = partitions.values.toSeq
-      .flatMap { files =>
-        files.flatMap { file =>
-          // TODO fix, currently assuming parquet as underlying format
-          HoodieDataSourceHelper.splitFiles(
-            sparkSession = sparkSession,
-            file = file,
-            partitionValues = getPartitionColumnsAsInternalRow(file)
-          )
-        }
-      }
+    val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)

Review Comment:
   This relation should still use `listLatestBaseFiles()` instead of `listLatestFileSlices`.  For MOR read-optimized query, only base files should be read, instead of the latest file slices.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils {
    * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
    * B is a subset of A
    */
-  def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
-    val attrs = from.toAttributes
-    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
-    val targetExprs = to.fields.map(f => attrsMap(f.name))
+  def generateUnsafeProjection(sourceStructType: StructType, targetStructType: StructType): UnsafeProjection = {
+    val resolver = SQLConf.get.resolver
+    val attrs = sourceStructType.toAttributes
+    val targetExprs = targetStructType.fields.map { targetField =>
+      val attrRef = attrs.find(attr => resolver(attr.name, targetField.name))
+        .getOrElse(throw new AnalysisException(s"Wasn't able to match target field `${targetField.name}` to any of the source attributes ($attrs)"))
+
+      genProjectingExpression(attrRef, targetField.dataType)
+    }
 
     GenerateUnsafeProjection.generate(targetExprs, attrs)
   }
 
+  private def genProjectingExpression(sourceExpr: Expression,
+                                      targetDataType: DataType): Expression = {
+    checkState(sourceExpr.resolved)
+
+    // TODO support array, map
+    (sourceExpr.dataType, targetDataType) match {
+      case (sdt, tdt) if sdt == tdt =>
+        sourceExpr
+
+      case (sourceType: StructType, targetType: StructType) =>
+        val fieldValueExprs = targetType.fields.map { tf =>
+          val ord = sourceType.fieldIndex(tf.name)
+          val fieldValExpr = genProjectingExpression(GetStructField(sourceExpr, ord, Some(tf.name)), tf.dataType)
+          Alias(fieldValExpr, tf.name)()
+        }
+
+        CreateStruct(fieldValueExprs)
+
+      case _ => throw new UnsupportedOperationException(s"(${sourceExpr.dataType}, $targetDataType)")
+    }
+  }
+
+  // TODO scala-docs

Review Comment:
   nit: add scala-docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org