You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:10 UTC
[hudi] 03/10: [HUDI-4936] Fix `as.of.instant` not recognized as hoodie config (#5616)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0f1130d2670c1be1c42badc3dcd8970c7f84fa2f
Author: Leon Tsao <31...@users.noreply.github.com>
AuthorDate: Thu Sep 29 14:18:41 2022 +0800
[HUDI-4936] Fix `as.of.instant` not recognized as hoodie config (#5616)
Co-authored-by: leon <le...@leondeMacBook-Pro.local>
Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
.../run/strategy/MultipleSparkJobExecutionStrategy.java | 3 ++-
.../org/apache/hudi/common/config/HoodieCommonConfig.java | 5 +++++
.../java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java | 6 ++++--
.../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 6 +-----
.../org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala | 12 ++++++++++--
5 files changed, 22 insertions(+), 10 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 0c52a7cc49..eab98f2f19 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -90,6 +90,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
@@ -375,7 +376,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
HashMap<String, String> params = new HashMap<>();
params.put("hoodie.datasource.query.type", "snapshot");
- params.put("as.of.instant", instantTime);
+ params.put(TIMESTAMP_AS_OF.key(), instantTime);
Path[] paths;
if (hasLogFiles) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 917cfe621f..00ff7e5683 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -36,6 +36,11 @@ public class HoodieCommonConfig extends HoodieConfig {
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");
+ public static final ConfigProperty<String> TIMESTAMP_AS_OF = ConfigProperty
+ .key("as.of.instant")
+ .noDefaultValue()
+ .withDocumentation("The query instant for time travel. Without specified this option, we query the latest snapshot.");
+
public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 44844a8d47..de1fd0055d 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -48,6 +48,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
+
/**
* Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then
* always accept
@@ -183,13 +185,13 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial
metaClientCache.put(baseDir.toString(), metaClient);
}
- if (getConf().get("as.of.instant") != null) {
+ if (getConf().get(TIMESTAMP_AS_OF.key()) != null) {
// Build FileSystemViewManager with specified time, it's necessary to set this config when you may
// access old version files. For example, in spark side, using "hoodie.datasource.read.paths"
// which contains old version files, if not specify this value, these files will be filtered.
fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()),
- metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant")));
+ metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key())));
} else {
fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()));
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index e8ffb09ff9..6370a0752e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -116,11 +116,7 @@ object DataSourceReadOptions {
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")
- val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty
- .key("as.of.instant")
- .noDefaultValue()
- .withDocumentation("The query instant for time travel. Without specified this option," +
- " we query the latest snapshot.")
+ val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF
val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.enable.data.skipping")
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 63186c0759..025a224373 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.PartitionPathEncodeUtils
-import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
+import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
@@ -250,9 +250,17 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
(baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority
(spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
- .filterKeys(_.startsWith("hoodie."))
+ .filterKeys(isHoodieConfigKey)
}
+ /**
+ * Check if Sql options are Hoodie Config keys.
+ *
+ * TODO: standardize the key prefix so that we don't need this helper (HUDI-4935)
+ */
+ def isHoodieConfigKey(key: String): Boolean =
+ key.startsWith("hoodie.") || key == DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key
+
/**
* Checks whether Spark is using Hive as Session's Catalog
*/