You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/19 10:48:45 UTC

[hudi] branch spark-perf-patch-for-0.11rc3 created (now 60e703564d)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a change to branch spark-perf-patch-for-0.11rc3
in repository https://gitbox.apache.org/repos/asf/hudi.git


      at 60e703564d apply spark perf patch

This branch includes the following new commits:

     new 60e703564d apply spark perf patch

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hudi] 01/01: apply spark perf patch

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch spark-perf-patch-for-0.11rc3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 60e703564daf2407cba5bab9cc22fa4977e31a46
Author: Raymond Xu <xu...@gmail.com>
AuthorDate: Tue Apr 19 18:48:34 2022 +0800

    apply spark perf patch
    
    https://patch-diff.githubusercontent.com/raw/apache/hudi/pull/5352.patch
---
 .../org/apache/hudi/BaseFileOnlyRelation.scala     | 50 +++++++++++++++++++++-
 .../main/scala/org/apache/hudi/DefaultSource.scala | 36 ++++++++++++++--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 15 ++++---
 .../scala/org/apache/hudi/SparkDatasetMixin.scala  | 43 +++++++++++++++++++
 .../apache/hudi/functional/TestCOWDataSource.scala | 19 ++++----
 .../apache/hudi/functional/TestMORDataSource.scala | 20 +++------
 .../functional/TestParquetColumnProjection.scala   |  9 ++--
 7 files changed, 154 insertions(+), 38 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index f46b31b036..adf94fffde 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -20,13 +20,15 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-
 import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
+import org.apache.hudi.common.model.HoodieFileFormat
 import org.apache.hudi.common.table.HoodieTableMetaClient
-
+import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
 
@@ -104,4 +106,48 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
     sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
       .map(HoodieBaseFileSplit.apply)
   }
+
+  /**
+   * NOTE: We have to fallback to [[HadoopFsRelation]] to make sure that all of the Spark optimizations could be
+   *       equally applied to Hudi tables, since some of those are predicated on the usage of [[HadoopFsRelation]],
+   *       and won't be applicable in case of us using our own custom relations (one of such optimizations is [[SchemaPruning]]
+   *       rule; you can find more details in HUDI-3896)
+   */
+  def toHadoopFsRelation: HadoopFsRelation = {
+    val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
+      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
+    }
+
+    if (globPaths.isEmpty) {
+      HadoopFsRelation(
+        location = fileIndex,
+        partitionSchema = fileIndex.partitionSchema,
+        dataSchema = fileIndex.dataSchema,
+        bucketSpec = None,
+        fileFormat = tableFileFormat,
+        optParams)(sparkSession)
+    } else {
+      val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
+      val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
+
+      DataSource.apply(
+        sparkSession = sparkSession,
+        paths = extraReadPaths,
+        userSpecifiedSchema = userSchema,
+        className = formatClassName,
+        // Since we're reading the table as just collection of files we have to make sure
+        // we only read the latest version of every Hudi's file-group, which might be compacted, clustered, etc.
+        // while keeping previous versions of the files around as well.
+        //
+        // We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
+        options = optParams ++ Map(
+          "mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName
+        ),
+        partitionColumns = partitionColumns
+      )
+        .resolveRelation()
+        .asInstanceOf[HadoopFsRelation]
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 7550ff13fd..c1229d5500 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE
 import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.log4j.LogManager
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
@@ -108,7 +110,7 @@ class DefaultSource extends RelationProvider
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
              (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-          new BaseFileOnlyRelation(sqlContext, metaClient, parameters, userSchema, globPaths)
+          resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
         case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
           new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)
 
@@ -141,7 +143,7 @@ class DefaultSource extends RelationProvider
     *
     * TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
     *       That is the only case where Spark seems to actually need a relation to be returned here
-    *       [[DataSource.writeAndRead()]]
+    *       [[org.apache.spark.sql.execution.datasources.DataSource.writeAndRead()]]
     *
     * @param sqlContext Spark SQL Context
     * @param mode Mode for saving the DataFrame at the destination
@@ -206,4 +208,32 @@ class DefaultSource extends RelationProvider
                             parameters: Map[String, String]): Source = {
     new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
   }
+
+  private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
+                                          globPaths: Seq[Path],
+                                          userSchema: Option[StructType],
+                                          metaClient: HoodieTableMetaClient,
+                                          optParams: Map[String, String]) = {
+    val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
+    val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema
+
+    // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
+    //       [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
+    //       vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
+    //
+    //       You can check out HUDI-3896 for more details
+    if (enableSchemaOnRead) {
+      baseRelation
+    } else {
+      baseRelation.toHadoopFsRelation
+    }
+  }
+
+  private def tryFetchInternalSchema(metaClient: HoodieTableMetaClient) =
+    try {
+      new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata
+        .orElse(InternalSchema.getEmptyInternalSchema)
+    } catch {
+      case _: Exception => InternalSchema.getEmptyInternalSchema
+    }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 769016ca8a..53667f3b88 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -19,12 +19,10 @@ package org.apache.hudi
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
-
 import org.apache.hudi.HoodieBaseRelation.getPartitionPath
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
@@ -38,13 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.io.storage.HoodieHFileReader
-
 import org.apache.spark.TaskContext
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
+import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
@@ -54,7 +52,6 @@ import org.apache.spark.unsafe.types.UTF8String
 
 import java.io.Closeable
 import java.net.URI
-
 import scala.collection.JavaConverters._
 import scala.util.Try
 import scala.util.control.NonFatal
@@ -78,7 +75,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
                                   val metaClient: HoodieTableMetaClient,
                                   val optParams: Map[String, String],
                                   userSchema: Option[StructType])
-  extends BaseRelation with PrunedFilteredScan with Logging with SparkAdapterSupport {
+  extends BaseRelation
+    with FileRelation
+    with PrunedFilteredScan
+    with SparkAdapterSupport
+    with Logging {
 
   type FileSplit <: HoodieFileSplit
 
@@ -198,6 +199,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    */
   override final def needConversion: Boolean = false
 
+  override def inputFiles: Array[String] = fileIndex.allFiles.map(_.getPath.toUri.toString).toArray
+
   /**
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
@@ -255,6 +258,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       sparkSession.sparkContext.emptyRDD
   }
 
+
+
   /**
    * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
    *
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala
new file mode 100644
index 0000000000..ee733a86a6
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/SparkDatasetMixin.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions.collectionAsScalaIterable
+
+trait SparkDatasetMixin {
+
+  def toDataset(spark: SparkSession, records: java.util.List[HoodieRecord[_]]) = {
+    val avroRecords = records.map(
+      _.getData
+        .asInstanceOf[HoodieRecordPayload[_]]
+        .getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA)
+        .get
+        .asInstanceOf[GenericRecord]
+    )
+      .toSeq
+    val rdd: RDD[GenericRecord] = spark.sparkContext.parallelize(avroRecords)
+    AvroConversionUtils.createDataFrame(rdd, HoodieTestDataGenerator.AVRO_SCHEMA.toString, spark)
+  }
+
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 000004ace9..b232ef010f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -37,7 +37,7 @@ import org.joda.time.DateTime
 import org.joda.time.format.DateTimeFormat
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
 import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
@@ -897,6 +897,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
       readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
   }
 
+  @Disabled("HUDI-3204")
   @Test
   def testHoodieBaseFileOnlyViewRelation(): Unit = {
     val _spark = spark
@@ -918,9 +919,9 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
       .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.TimestampBasedKeyGenerator")
       .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "DATE_STRING")
+      .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd")
       .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy/MM/dd")
       .option(Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, "GMT+8:00")
-      .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy-MM-dd")
       .mode(org.apache.spark.sql.SaveMode.Append)
       .save(basePath)
 
@@ -929,15 +930,13 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assert(res.count() == 2)
 
     // data_date is the partition field. Persist to the parquet file using the origin values, and read it.
-    assertTrue(
-      res.select("data_date").map(_.get(0).toString).collect().sorted.sameElements(
-        Array("2018-09-23", "2018-09-24")
-      )
+    assertEquals(
+      res.select("data_date").map(_.get(0).toString).collect().sorted,
+      Array("2018-09-23", "2018-09-24")
     )
-    assertTrue(
-      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.sameElements(
-        Array("2018/09/23", "2018/09/24")
-      )
+    assertEquals(
+      res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted,
+      Array("2018/09/23", "2018/09/24")
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index d8ebe5cbcd..b57cbb09ed 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, SparkDatasetMixin}
 import org.apache.log4j.LogManager
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -48,7 +48,7 @@ import scala.collection.JavaConverters._
 /**
  * Tests on Spark DataSource for MOR table.
  */
-class TestMORDataSource extends HoodieClientTestBase {
+class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
 
   var spark: SparkSession = null
   private val log = LogManager.getLogger(classOf[TestMORDataSource])
@@ -356,7 +356,7 @@ class TestMORDataSource extends HoodieClientTestBase {
 
     val hoodieRecords1 = dataGen.generateInserts("001", 100)
 
-    val inputDF1 = toDataset(hoodieRecords1)
+    val inputDF1 = toDataset(spark, hoodieRecords1)
     inputDF1.write.format("org.apache.hudi")
       .options(opts)
       .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
@@ -382,7 +382,7 @@ class TestMORDataSource extends HoodieClientTestBase {
     // Upsert 50 update records
     // Snopshot view should read 100 records
     val records2 = dataGen.generateUniqueUpdates("002", 50)
-    val inputDF2 = toDataset(records2)
+    val inputDF2 = toDataset(spark, records2)
     inputDF2.write.format("org.apache.hudi")
       .options(opts)
       .mode(SaveMode.Append)
@@ -429,7 +429,7 @@ class TestMORDataSource extends HoodieClientTestBase {
     verifyShow(hudiIncDF1Skipmerge)
 
     val record3 = dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1)
-    val inputDF3 = toDataset(record3)
+    val inputDF3 = toDataset(spark, record3)
     inputDF3.write.format("org.apache.hudi").options(opts)
       .mode(SaveMode.Append).save(basePath)
 
@@ -443,16 +443,6 @@ class TestMORDataSource extends HoodieClientTestBase {
     assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count())
   }
 
-  private def toDataset(records: util.List[HoodieRecord[_]]) = {
-    val avroRecords = records.map(_.getData
-      .asInstanceOf[HoodieRecordPayload[_]]
-      .getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA)
-      .get
-      .asInstanceOf[GenericRecord])
-    val rdd: RDD[GenericRecord] = spark.sparkContext.parallelize(avroRecords, 2)
-    AvroConversionUtils.createDataFrame(rdd, HoodieTestDataGenerator.AVRO_SCHEMA.toString, spark)
-  }
-
   @Test
   def testVectorizedReader() {
     spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 2cdd7880bf..f670450c3e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode}
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.{Disabled, Tag, Test}
 
 import scala.collection.JavaConverters._
 
@@ -53,6 +53,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
     DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName
   )
 
+  @Disabled("HUDI-3896")
   @Test
   def testBaseFileOnlyViewRelation(): Unit = {
     val tablePath = s"$basePath/cow"
@@ -129,7 +130,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
         fail("Only Spark 3 and Spark 2 are currently supported")
 
     // Test MOR / Read Optimized
-    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
+    // TODO(HUDI-3896) re-enable
+    //runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
   }
 
   @Test
@@ -184,7 +186,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
         fail("Only Spark 3 and Spark 2 are currently supported")
 
     // Test MOR / Read Optimized
-    runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
+    // TODO(HUDI-3896) re-enable
+    //runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStatsReadOptimized)
   }
 
   @Test