You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/08 23:06:45 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4709: [HUDI-3338] custom relation instead of HadoopFsRelation

alexeykudinkin commented on a change in pull request #4709:
URL: https://github.com/apache/hudi/pull/4709#discussion_r799755380



##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -293,4 +293,21 @@ object HoodieSparkUtils extends SparkAdapterSupport {
       s"${tableSchema.fieldNames.mkString(",")}")
     AttributeReference(columnName, field.get.dataType, field.get.nullable)()
   }
+
+  def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String]): (Schema, StructType) = {
+    // First get the required avro-schema, then convert the avro-schema to spark schema.
+    val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
+    val requiredFields = requiredColumns.map(c => name2Fields(c))
+      .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList

Review comment:
       What's the idea behind re-creating the `Schema.Field`?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, _}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+import scala.util.Try
+
+class BaseFileOnlyViewRelation(
+    val sqlContext: SQLContext,
+    metaClient: HoodieTableMetaClient,
+    optParams: Map[String, String],
+    userSchema: StructType
+  ) extends BaseRelation with PrunedFilteredScan {
+
+  private val sparkSession = sqlContext.sparkSession
+
+  private val tableAvroSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    Try (schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema))
+  }
+
+  private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+  private val fileIndex = HoodieFileIndex(sparkSession,
+    metaClient,
+    if (userSchema == null) Option.empty[StructType] else Some(userSchema),

Review comment:
       Let's wrap schema into Option to avoid dealing w/ nulls

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+  /**
+   * Partition the given condition into two sequence of conjunctive predicates:
+   * - predicates that can be evaluated using metadata only.
+   * - other predicates.
+   */
+  def splitPartitionAndDataPredicates(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+    splitConjunctivePredicates(condition).partition(
+      isPredicateMetadataOnly(_, partitionColumns, spark))
+  }
+
+  /**
+   * Check if condition can be evaluated using only metadata. In Delta, this means the condition
+   * only references partition columns and involves no subquery.
+   */
+  def isPredicateMetadataOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+        !containsSubquery(condition)
+  }
+
+  /**
+   * Does the predicate only contains partition columns?
+   */
+  def isPredicatePartitionColumnsOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    val nameEquality = spark.sessionState.analyzer.resolver
+    condition.references.forall { r =>
+      partitionColumns.exists(nameEquality(r.name, _))
+    }
+  }
+
+  /**
+   * Check if condition involves a subquery expression.
+   */
+  def containsSubquery(condition: Expression): Boolean = {
+    SubqueryExpression.hasSubquery(condition)
+  }
+
+  /**
+   * Wrapper `readFunction` to deal with [[ColumnarBatch]] when enable parquet vectorized reader.
+   */
+  def readParquetFile(
+      file: PartitionedFile,
+      readFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFunction(file)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  /**
+   * Extract the required schema from [[InternalRow]]
+   */
+  def extractRequiredSchema(
+      iter: Iterator[InternalRow],
+      requiredSchema: StructType,
+      requiredFieldPos: Seq[Int]): Iterator[InternalRow] = {
+    val unsafeProjection = UnsafeProjection.create(requiredSchema)
+    val rows = iter.map { row =>
+      unsafeProjection(createInternalRowWithSchema(row, requiredSchema, requiredFieldPos))
+    }
+    rows
+  }
+
+  /**
+   * Convert [[InternalRow]] to [[SpecificInternalRow]].
+   */
+  def createInternalRowWithSchema(
+      row: InternalRow,
+      schema: StructType,
+      positions: Seq[Int]): InternalRow = {
+    val rowToReturn = new SpecificInternalRow(schema)
+    var curIndex = 0
+    schema.zip(positions).foreach { case (field, pos) =>
+      val curField = if (row.isNullAt(pos)) {
+        null
+      } else {
+        row.get(pos, field.dataType)
+      }
+      rowToReturn.update(curIndex, curField)
+      curIndex += 1
+    }
+    rowToReturn
+  }
+
+
+  def splitFiles(
+      sparkSession: SparkSession,
+      file: FileStatus,
+      partitionValues: InternalRow): Seq[PartitionedFile] = {
+    val filePath = file.getPath
+    val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+    (0L until file.getLen by maxSplitBytes).map { offset =>
+      val remaining = file.getLen - offset
+      val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
+      PartitionedFile(partitionValues, filePath.toUri.toString, offset, size)
+    }
+  }
+
+  def getFilePartitions(

Review comment:
       Why can't we actually re-use the `FilePartition.getFilePartitions` methods here?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+  /**
+   * Partition the given condition into two sequence of conjunctive predicates:
+   * - predicates that can be evaluated using metadata only.
+   * - other predicates.
+   */
+  def splitPartitionAndDataPredicates(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+    splitConjunctivePredicates(condition).partition(
+      isPredicateMetadataOnly(_, partitionColumns, spark))
+  }
+
+  /**
+   * Check if condition can be evaluated using only metadata. In Delta, this means the condition
+   * only references partition columns and involves no subquery.
+   */
+  def isPredicateMetadataOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+        !containsSubquery(condition)
+  }
+
+  /**
+   * Does the predicate only contains partition columns?
+   */
+  def isPredicatePartitionColumnsOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    val nameEquality = spark.sessionState.analyzer.resolver
+    condition.references.forall { r =>
+      partitionColumns.exists(nameEquality(r.name, _))
+    }
+  }
+
+  /**
+   * Check if condition involves a subquery expression.
+   */
+  def containsSubquery(condition: Expression): Boolean = {
+    SubqueryExpression.hasSubquery(condition)
+  }
+
+  /**
+   * Wrapper `readFunction` to deal with [[ColumnarBatch]] when enable parquet vectorized reader.
+   */
+  def readParquetFile(

Review comment:
       Instead of plugging in this function in the RDD, let's actually: 
   
   1. Rename it to something like `unravelColumnarBatch`
   2. Invoke it on original read function to return `Iterator[InternalRow]` in the Relation itself (before passing it to RDD)

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
+ *
+ * This class will extract the fields needed according to [[requiredColumns]] and
+ * return iterator of [[org.apache.spark.sql.Row]] directly.
+ */
+class HoodieFileScanRDD(
+    @transient private val sparkSession: SparkSession,
+    requiredColumns: Array[String],
+    schema: StructType,
+    readFunction: PartitionedFile => Iterator[Any],
+    @transient val filePartitions: Seq[FilePartition])
+  extends RDD[Row](sparkSession.sparkContext, Nil) {
+
+  private val requiredSchema = {
+    val nameToStructField = schema.map(field => (field.name, field)).toMap
+    StructType(requiredColumns.map(nameToStructField))
+  }
+
+  private val requiredFieldPos = requiredSchema.map(f => schema.fieldIndex(f.name))

Review comment:
       I think this should be mapping into indexes in the Table's schema, shouldn't it?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
+ *
+ * This class will extract the fields needed according to [[requiredColumns]] and
+ * return iterator of [[org.apache.spark.sql.Row]] directly.
+ */
+class HoodieFileScanRDD(
+    @transient private val sparkSession: SparkSession,
+    requiredColumns: Array[String],
+    schema: StructType,
+    readFunction: PartitionedFile => Iterator[Any],
+    @transient val filePartitions: Seq[FilePartition])
+  extends RDD[Row](sparkSession.sparkContext, Nil) {
+
+  private val requiredSchema = {
+    val nameToStructField = schema.map(field => (field.name, field)).toMap
+    StructType(requiredColumns.map(nameToStructField))
+  }
+
+  private val requiredFieldPos = requiredSchema.map(f => schema.fieldIndex(f.name))

Review comment:
       Let's extract this as utility, something along the line of:
   
   ```
   collectFieldIndexes(projectedSchema, originalSchema): = ...
   ```

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, _}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+import scala.util.Try
+
+class BaseFileOnlyViewRelation(
+    val sqlContext: SQLContext,
+    metaClient: HoodieTableMetaClient,
+    optParams: Map[String, String],
+    userSchema: StructType
+  ) extends BaseRelation with PrunedFilteredScan {
+
+  private val sparkSession = sqlContext.sparkSession
+
+  private val tableAvroSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    Try (schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema))
+  }
+
+  private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+  private val fileIndex = HoodieFileIndex(sparkSession,
+    metaClient,
+    if (userSchema == null) Option.empty[StructType] else Some(userSchema),
+    optParams,
+    FileStatusCache.getOrCreate(sqlContext.sparkSession)
+  )
+
+  private val partitionColumns = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
+
+  override def schema: StructType =  tableStructSchema
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

Review comment:
       There's a heavy overlap w/ MergeOnReadSnapshotRelation let's extract common functionality to a Base class

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+  /**
+   * Partition the given condition into two sequence of conjunctive predicates:
+   * - predicates that can be evaluated using metadata only.
+   * - other predicates.
+   */
+  def splitPartitionAndDataPredicates(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+    splitConjunctivePredicates(condition).partition(
+      isPredicateMetadataOnly(_, partitionColumns, spark))
+  }
+
+  /**
+   * Check if condition can be evaluated using only metadata. In Delta, this means the condition
+   * only references partition columns and involves no subquery.
+   */
+  def isPredicateMetadataOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+        !containsSubquery(condition)
+  }
+
+  /**
+   * Does the predicate only contains partition columns?
+   */
+  def isPredicatePartitionColumnsOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    val nameEquality = spark.sessionState.analyzer.resolver

Review comment:
       Maybe add a comment here that this is not just about comparing the names, but instead resolving the names and make sure these resolve to the same entity 

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+  /**
+   * Partition the given condition into two sequence of conjunctive predicates:
+   * - predicates that can be evaluated using metadata only.
+   * - other predicates.
+   */
+  def splitPartitionAndDataPredicates(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+    splitConjunctivePredicates(condition).partition(
+      isPredicateMetadataOnly(_, partitionColumns, spark))
+  }
+
+  /**
+   * Check if condition can be evaluated using only metadata. In Delta, this means the condition
+   * only references partition columns and involves no subquery.
+   */
+  def isPredicateMetadataOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+        !containsSubquery(condition)
+  }
+
+  /**
+   * Does the predicate only contains partition columns?
+   */
+  def isPredicatePartitionColumnsOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    val nameEquality = spark.sessionState.analyzer.resolver
+    condition.references.forall { r =>
+      partitionColumns.exists(nameEquality(r.name, _))
+    }
+  }
+
+  /**
+   * Check if condition involves a subquery expression.
+   */
+  def containsSubquery(condition: Expression): Boolean = {

Review comment:
       I think we can inline this one as it's just a single call itself

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+  /**
+   * Partition the given condition into two sequence of conjunctive predicates:
+   * - predicates that can be evaluated using metadata only.
+   * - other predicates.
+   */
+  def splitPartitionAndDataPredicates(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+    splitConjunctivePredicates(condition).partition(
+      isPredicateMetadataOnly(_, partitionColumns, spark))
+  }
+
+  /**
+   * Check if condition can be evaluated using only metadata. In Delta, this means the condition
+   * only references partition columns and involves no subquery.
+   */
+  def isPredicateMetadataOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+        !containsSubquery(condition)
+  }
+
+  /**
+   * Does the predicate only contains partition columns?
+   */
+  def isPredicatePartitionColumnsOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {

Review comment:
       `SparkSession` (as well as generally any contextual object) is usually passed as a first arg. Let's make it consistent

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, _}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+import scala.util.Try
+
+class BaseFileOnlyViewRelation(

Review comment:
       Let's add a scala-doc for it

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -61,26 +64,31 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
   } else {
     new Properties()
   }
+
+  private val requiredSchema = tableState.requiredStructSchema
+
+  private val requiredFieldPosition = requiredSchema.map(f => tableState.tableStructSchema.fieldIndex(f.name))
+
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
     val iter = mergeOnReadPartition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
-        val rows = read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
-        extractRequiredSchema(rows)
+        val rows = readParquetFile(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
+        extractRequiredSchema(rows, requiredSchema, requiredFieldPosition)

Review comment:
       This is actually will try to project already projected data, i think we should be able to clean this up.
   
   Projection is actually done by Spark's Parquet reader in `ParquetFileFormat#buildReaderWithPartitionValues` in both Spark 2.4 and 3.2

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
+ *
+ * This class will extract the fields needed according to [[requiredColumns]] and
+ * return iterator of [[org.apache.spark.sql.Row]] directly.
+ */
+class HoodieFileScanRDD(
+    @transient private val sparkSession: SparkSession,
+    requiredColumns: Array[String],
+    schema: StructType,
+    readFunction: PartitionedFile => Iterator[Any],
+    @transient val filePartitions: Seq[FilePartition])
+  extends RDD[Row](sparkSession.sparkContext, Nil) {
+
+  private val requiredSchema = {
+    val nameToStructField = schema.map(field => (field.name, field)).toMap
+    StructType(requiredColumns.map(nameToStructField))
+  }
+
+  private val requiredFieldPos = requiredSchema.map(f => schema.fieldIndex(f.name))
+
+  override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+    val iterator = new Iterator[Object] with AutoCloseable {
+
+      private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
+      private[this] var currentFile: PartitionedFile = null
+      private[this] var currentIterator: Iterator[Object] = null
+
+      override def hasNext: Boolean = {
+        (currentIterator != null && currentIterator.hasNext) || nextIterator()
+      }
+
+      def next(): Object = {
+        currentIterator.next()
+      }
+
+      /** Advances to the next file. Returns true if a new non-empty iterator is available. */
+      private def nextIterator(): Boolean = {
+        if (files.hasNext) {
+          currentFile = files.next()
+
+          logInfo(s"Reading File $currentFile")
+          currentIterator = HoodieDataSourceHelper.readParquetFile(currentFile, readFunction)
+
+          try {
+            hasNext
+          } catch {
+            case e: SchemaColumnConvertNotSupportedException =>

Review comment:
       I don't think it's the right place for this Exception handling. Let's bubble it down into the file reading sequence itself (w/in `readParquetFile`)

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]].
+ *
+ * This class will extract the fields needed according to [[requiredColumns]] and
+ * return iterator of [[org.apache.spark.sql.Row]] directly.
+ */
+class HoodieFileScanRDD(
+    @transient private val sparkSession: SparkSession,
+    requiredColumns: Array[String],
+    schema: StructType,
+    readFunction: PartitionedFile => Iterator[Any],
+    @transient val filePartitions: Seq[FilePartition])
+  extends RDD[Row](sparkSession.sparkContext, Nil) {
+
+  private val requiredSchema = {
+    val nameToStructField = schema.map(field => (field.name, field)).toMap
+    StructType(requiredColumns.map(nameToStructField))
+  }
+
+  private val requiredFieldPos = requiredSchema.map(f => schema.fieldIndex(f.name))
+
+  override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+    val iterator = new Iterator[Object] with AutoCloseable {
+
+      private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
+      private[this] var currentFile: PartitionedFile = null
+      private[this] var currentIterator: Iterator[Object] = null
+
+      override def hasNext: Boolean = {
+        (currentIterator != null && currentIterator.hasNext) || nextIterator()
+      }
+
+      def next(): Object = {
+        currentIterator.next()
+      }
+
+      /** Advances to the next file. Returns true if a new non-empty iterator is available. */
+      private def nextIterator(): Boolean = {
+        if (files.hasNext) {
+          currentFile = files.next()
+
+          logInfo(s"Reading File $currentFile")
+          currentIterator = HoodieDataSourceHelper.readParquetFile(currentFile, readFunction)
+
+          try {
+            hasNext
+          } catch {
+            case e: SchemaColumnConvertNotSupportedException =>
+              val message = "Parquet column cannot be converted in " +
+                s"file ${currentFile.filePath}. Column: ${e.getColumn}, " +
+                s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}"
+              throw new QueryExecutionException(message, e)
+
+            case e => throw e
+          }
+        } else {
+          currentFile = null
+          false
+        }
+      }
+
+      override def close(): Unit = {}
+    }
+
+    // Register an on-task-completion callback to close the input stream.
+    context.addTaskCompletionListener[Unit](_ => iterator.close())
+
+    // extract required columns from row
+    val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema(

Review comment:
       I actually don't think we need this, please check my other comment regarding duplicate projections

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieDataSourceHelper extends PredicateHelper {
+
+  /**
+   * Partition the given condition into two sequence of conjunctive predicates:
+   * - predicates that can be evaluated using metadata only.
+   * - other predicates.
+   */
+  def splitPartitionAndDataPredicates(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
+    splitConjunctivePredicates(condition).partition(
+      isPredicateMetadataOnly(_, partitionColumns, spark))
+  }
+
+  /**
+   * Check if condition can be evaluated using only metadata. In Delta, this means the condition
+   * only references partition columns and involves no subquery.
+   */
+  def isPredicateMetadataOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {
+    isPredicatePartitionColumnsOnly(condition, partitionColumns, spark) &&
+        !containsSubquery(condition)
+  }
+
+  /**
+   * Does the predicate only contains partition columns?
+   */
+  def isPredicatePartitionColumnsOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      spark: SparkSession): Boolean = {

Review comment:
       Would also be great to do it across the board in your change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org