You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:10 UTC

[hudi] 03/10: [HUDI-4936] Fix `as.of.instant` not recognized as hoodie config (#5616)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0f1130d2670c1be1c42badc3dcd8970c7f84fa2f
Author: Leon Tsao <31...@users.noreply.github.com>
AuthorDate: Thu Sep 29 14:18:41 2022 +0800

    [HUDI-4936] Fix `as.of.instant` not recognized as hoodie config (#5616)
    
    
    Co-authored-by: leon <le...@leondeMacBook-Pro.local>
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../run/strategy/MultipleSparkJobExecutionStrategy.java      |  3 ++-
 .../org/apache/hudi/common/config/HoodieCommonConfig.java    |  5 +++++
 .../java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java |  6 ++++--
 .../src/main/scala/org/apache/hudi/DataSourceOptions.scala   |  6 +-----
 .../org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala     | 12 ++++++++++--
 5 files changed, 22 insertions(+), 10 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 0c52a7cc49..eab98f2f19 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -90,6 +90,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
 import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
 
@@ -375,7 +376,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
 
     HashMap<String, String> params = new HashMap<>();
     params.put("hoodie.datasource.query.type", "snapshot");
-    params.put("as.of.instant", instantTime);
+    params.put(TIMESTAMP_AS_OF.key(), instantTime);
 
     Path[] paths;
     if (hasLogFiles) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 917cfe621f..00ff7e5683 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -36,6 +36,11 @@ public class HoodieCommonConfig extends HoodieConfig {
       .defaultValue(false)
       .withDocumentation("Enables support for Schema Evolution feature");
 
+  public static final ConfigProperty<String> TIMESTAMP_AS_OF = ConfigProperty
+      .key("as.of.instant")
+      .noDefaultValue()
+      .withDocumentation("The query instant for time travel. Without specified this option, we query the latest snapshot.");
+
   public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
       .key("hoodie.datasource.write.reconcile.schema")
       .defaultValue(false)
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 44844a8d47..de1fd0055d 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -48,6 +48,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
+
 /**
  * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then
  * always accept
@@ -183,13 +185,13 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
-          if (getConf().get("as.of.instant") != null) {
+          if (getConf().get(TIMESTAMP_AS_OF.key()) != null) {
             // Build FileSystemViewManager with specified time, it's necessary to set this config when you may
             // access old version files. For example, in spark side, using "hoodie.datasource.read.paths"
             // which contains old version files, if not specify this value, these files will be filtered.
             fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
                 metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()),
-                metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant")));
+                metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key())));
           } else {
             fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
                 metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()));
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index e8ffb09ff9..6370a0752e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -116,11 +116,7 @@ object DataSourceReadOptions {
     .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
       + "instead of the full table. This option allows using glob pattern to directly filter on path.")
 
-  val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty
-    .key("as.of.instant")
-    .noDefaultValue()
-    .withDocumentation("The query instant for time travel. Without specified this option," +
-      " we query the latest snapshot.")
+  val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF
 
   val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty
     .key("hoodie.enable.data.skipping")
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 63186c0759..025a224373 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
-import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.Resolver
@@ -250,9 +250,17 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
                    (baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
     baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority
       (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
-        .filterKeys(_.startsWith("hoodie."))
+        .filterKeys(isHoodieConfigKey)
   }
 
+  /**
+   * Check if Sql options are Hoodie Config keys.
+   *
+   * TODO: standardize the key prefix so that we don't need this helper (HUDI-4935)
+   */
+  def isHoodieConfigKey(key: String): Boolean =
+    key.startsWith("hoodie.") || key == DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key
+
   /**
    * Checks whether Spark is using Hive as Session's Catalog
    */