You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/06 20:03:45 UTC

[GitHub] [hudi] vinothchandar commented on a diff in pull request #5737: [HUDI-4178][Stacked on 5733] Addressing performance regressions in Spark DataSourceV2 Integration

vinothchandar commented on code in PR #5737:
URL: https://github.com/apache/hudi/pull/5737#discussion_r890321822


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -142,6 +142,9 @@ object DataSourceReadOptions {
     .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
     .defaultValue("false")
     .withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.")
+
+  val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE

Review Comment:
   We need to move `HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE` into a hudi-common config class so its shared across writing and queries. 



##########
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala:
##########
@@ -105,12 +106,30 @@ class HoodieCatalog extends DelegatingCatalogExtension
           case _ =>
             catalogTable0
         }
-        HoodieInternalV2Table(
+
+        val v2Table = HoodieInternalV2Table(
           spark = spark,
           path = catalogTable.location.toString,
           catalogTable = Some(catalogTable),
           tableIdentifier = Some(ident.toString))
-      case o => o
+
+        val schemaEvolutionEnabled: Boolean = spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+          DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+
+        // NOTE: PLEASE READ CAREFULLY
+        //
+        // Since Hudi relations don't currently implement DS V2 Read API, we by default fallback to V1 here.
+        // Such fallback will have considerable performance impact, therefore it's only performed in cases
+        // where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature
+        //
+        // Check out HUDI-4178 for more details
+        if (schemaEvolutionEnabled) {
+          v2Table
+        } else {
+          v2Table.v1TableWrapper

Review Comment:
   guess this is the key change



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -122,29 +122,39 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
     optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
       .map(HoodieSqlCommonUtils.formatQueryInstant)
 
+  /**
+   * NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O
+   *       required to fetch table's Avro and Internal schemas
+   */
   protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
-    val schemaUtil = new TableSchemaResolver(metaClient)
-    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
-      case Success(schema) => schema
-      case Failure(e) =>
-        logWarning("Failed to fetch schema from the table", e)
-        // If there is no commit in the table, we can't get the schema
-        // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
-        userSchema match {
-          case Some(s) => convertToAvroSchema(s)
-          case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
-        }
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
+      Try(schemaResolver.getTableAvroSchema) match {
+        case Success(schema) => schema
+        case Failure(e) =>
+          logError("Failed to fetch schema from the table", e)
+          throw new HoodieSchemaException("Failed to fetch schema from the table")
+      }
     }
-    // try to find internalSchema
-    val internalSchemaFromMeta = try {
-      schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
-    } catch {
-      case _: Exception => InternalSchema.getEmptyInternalSchema
+
+    val schemaEvolutionEnabled: Boolean = optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,

Review Comment:
   cc @alexeykudinkin to chime in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org