You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/07/21 09:42:22 UTC

[hudi] branch master updated: [HUDI-3764] Allow loading external configs while querying Hudi tables with Spark (#4915)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7fe3fd01d [HUDI-3764] Allow loading external configs while querying Hudi tables with Spark (#4915)
c7fe3fd01d is described below

commit c7fe3fd01dc0b7b3d1c4c579391337a0f9c609dd
Author: wenningd <we...@gmail.com>
AuthorDate: Thu Jul 21 02:42:17 2022 -0700

    [HUDI-3764] Allow loading external configs while querying Hudi tables with Spark (#4915)
    
    Currently when doing Hudi queries w/ Spark, it won't
    load the external configurations. Say if customers enabled
    metadata listing in their global config file, then this would
    let them actually query w/o metadata feature enabled.
    This PR fixes this issue and allows loading global
    configs during the Hudi reading phase.
    
    Co-authored-by: Wenning Ding <we...@amazon.com>
---
 .../common/config/DFSPropertiesConfiguration.java  |  6 ++++
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  9 +++---
 .../org/apache/spark/sql/hudi/TestSqlConf.scala    | 37 ++++++++++++++--------
 3 files changed, 35 insertions(+), 17 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
index f3614a64b7..08cbd568df 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java
@@ -196,6 +196,12 @@ public class DFSPropertiesConfiguration {
     return globalProps;
   }
 
+  // test only
+  public static TypedProperties addToGlobalProps(String key, String value) {
+    GLOBAL_PROPS.put(key, value);
+    return GLOBAL_PROPS;
+  }
+
   public TypedProperties getProps() {
     return new TypedProperties(hoodieConfig.getProps());
   }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index feeb572126..844e171b08 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties}
+import org.apache.hudi.common.config.{ConfigProperty, DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.fs.ConsistencyGuardConfig
 import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
@@ -768,13 +768,14 @@ object DataSourceOptionsHelper {
   def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = {
     // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
     // or else use query type from QUERY_TYPE.
-    val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
+    val paramsWithGlobalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ parameters
+    val queryType = paramsWithGlobalProps.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
       .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
-      .getOrElse(parameters.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
+      .getOrElse(paramsWithGlobalProps.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue()))
 
     Map(
       QUERY_TYPE.key -> queryType
-    ) ++ translateConfigurations(parameters)
+    ) ++ translateConfigurations(paramsWithGlobalProps)
   }
 
   def inferKeyGenClazz(props: TypedProperties): String = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
index ac3c49efdd..90d0734945 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSqlConf.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.common.config.DFSPropertiesConfiguration
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
@@ -60,20 +61,21 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter {
            | )
        """.stripMargin)
 
-      // First merge with a extra input field 'flag' (insert a new record)
-      spark.sql(
-        s"""
-           | merge into $tableName
-           | using (
-           |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag, $partitionVal as year
-           | ) s0
-           | on s0.id = $tableName.id
-           | when matched and flag = '1' then update set
-           | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, year = s0.year
-           | when not matched and flag = '1' then insert *
-       """.stripMargin)
+      // First insert a new record
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, $partitionVal)")
+
+      val metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(tablePath)
+        .setConf(spark.sessionState.newHadoopConf())
+        .build()
+      val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+      // Then insert another new record
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, $partitionVal)")
+
       checkAnswer(s"select id, name, price, ts, year from $tableName")(
-        Seq(1, "a1", 10.0, 1000, partitionVal)
+        Seq(1, "a1", 10.0, 1000, partitionVal),
+        Seq(2, "a2", 10.0, 1000, partitionVal)
       )
 
       // By default, Spark DML would set table type to COW and use Hive style partitioning, here we
@@ -85,6 +87,15 @@ class TestSqlConf extends HoodieSparkSqlTestBase with BeforeAndAfter {
         s"$tablePath/" + HoodieTableMetaClient.METAFOLDER_NAME,
         HoodieTableConfig.PAYLOAD_CLASS_NAME.defaultValue).getTableType)
 
+      // Manually pass incremental configs to global configs to make sure Hudi query is able to load the
+      // global configs
+      DFSPropertiesConfiguration.addToGlobalProps(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      DFSPropertiesConfiguration.addToGlobalProps(BEGIN_INSTANTTIME.key, firstCommit)
+      spark.catalog.refreshTable(tableName)
+      checkAnswer(s"select id, name, price, ts, year from $tableName")(
+        Seq(2, "a2", 10.0, 1000, partitionVal)
+      )
+
       // delete the record
       spark.sql(s"delete from $tableName where year = $partitionVal")
       val cnt = spark.sql(s"select * from $tableName where year = $partitionVal").count()