You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/09 03:09:07 UTC
[hudi] branch master updated: [HUDI-4465] Optimizing file-listing sequence of Metadata Table (#6016)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4af60dcfba [HUDI-4465] Optimizing file-listing sequence of Metadata Table (#6016)
4af60dcfba is described below
commit 4af60dcfbaeea5e79bc4b9457477e9ee0f9cdb79
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Sep 8 20:09:00 2022 -0700
[HUDI-4465] Optimizing file-listing sequence of Metadata Table (#6016)
Optimizes file-listing sequence of the Metadata Table to make sure it's on par or better than FS-based file-listing
Change log:
- Cleaned up avoidable instantiations of Hadoop's Path
- Replaced new Path w/ createUnsafePath where possible
- Cached TimestampFormatter, DateFormatter for timezone
- Avoid loading defaults in Hadoop conf when init-ing HFile reader
- Avoid re-instantiating BaseTableMetadata twice w/in BaseHoodieTableFileIndex
- Avoid looking up FileSystem for every partition when listing partitioned table, instead do it just once
---
docker/demo/config/dfs-source.properties | 4 +
.../metadata/HoodieBackedTableMetadataWriter.java | 6 +-
.../apache/hudi/keygen/BuiltinKeyGenerator.java | 1 -
.../org/apache/hudi/keygen/SimpleKeyGenerator.java | 7 ++
.../datasources/SparkParsePartitionUtil.scala | 12 +-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 2 +-
.../functional/TestConsistentBucketIndex.java | 2 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 26 +++-
.../TestHoodieSparkMergeOnReadTableCompaction.java | 19 ++-
...HoodieSparkMergeOnReadTableIncrementalRead.java | 2 +-
.../TestHoodieSparkMergeOnReadTableRollback.java | 9 +-
.../hudi/testutils/HoodieClientTestHarness.java | 6 +-
.../SparkClientFunctionalTestHarness.java | 12 +-
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 87 +++++++++----
.../java/org/apache/hudi/common/fs/FSUtils.java | 13 +-
.../org/apache/hudi/common/model/BaseFile.java | 9 ++
.../hudi/common/table/HoodieTableMetaClient.java | 2 +-
.../table/log/block/HoodieHFileDataBlock.java | 26 ++--
.../table/view/AbstractTableFileSystemView.java | 13 +-
.../apache/hudi/common/util/CollectionUtils.java | 9 ++
.../java/org/apache/hudi/hadoop/CachingPath.java | 59 ++++++++-
.../org/apache/hudi/hadoop/SerializablePath.java | 9 +-
.../apache/hudi/io/storage/HoodieHFileUtils.java | 4 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 135 ++++++++++++---------
.../hudi/metadata/HoodieBackedTableMetadata.java | 11 +-
.../hudi/metadata/HoodieMetadataPayload.java | 18 ++-
.../apache/hudi/metadata/HoodieTableMetadata.java | 21 +++-
.../hudi/common/testutils/HoodieTestUtils.java | 35 +++---
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 5 +-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 28 +++--
.../hudi/functional/TestTimeTravelQuery.scala | 6 +-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 2 +-
.../datasources/Spark2ParsePartitionUtil.scala | 14 +--
.../apache/hudi/spark3/internal/ReflectUtil.java | 4 +-
.../spark/sql/adapter/BaseSpark3Adapter.scala | 4 +-
.../datasources/Spark3ParsePartitionUtil.scala | 48 ++++----
.../functional/TestHoodieDeltaStreamer.java | 10 +-
.../utilities/sources/TestHoodieIncrSource.java | 4 +-
38 files changed, 451 insertions(+), 233 deletions(-)
diff --git a/docker/demo/config/dfs-source.properties b/docker/demo/config/dfs-source.properties
index ac7080e141..a90629ef8e 100644
--- a/docker/demo/config/dfs-source.properties
+++ b/docker/demo/config/dfs-source.properties
@@ -19,6 +19,10 @@ include=base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=date
+# NOTE: We have to duplicate configuration since this is being used
+# w/ both Spark and DeltaStreamer
+hoodie.table.recordkey.fields=key
+hoodie.table.partition.fields=date
# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index c7cc50967a..962875fb92 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1006,21 +1006,21 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
// finish off any pending compactions if any from previous attempt.
writeClient.runAnyPendingCompactions();
- String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+ String latestDeltaCommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
.get().getTimestamp();
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());
if (!pendingInstants.isEmpty()) {
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
- pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
+ pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray())));
return;
}
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
- final String compactionInstantTime = latestDeltacommitTime + "001";
+ final String compactionInstantTime = latestDeltaCommitTime + "001";
if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
writeClient.compact(compactionInstantTime);
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index d7561cc126..ad71b17ce7 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -77,7 +77,6 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
-
protected transient volatile SparkRowConverter rowConverter;
protected transient volatile SparkRowAccessor rowAccessor;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index dcffdf3cdb..8c43e19baa 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -46,6 +47,12 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
super(props);
+ // Make sure key-generator is configured properly
+ ValidationUtils.checkArgument(recordKeyField == null || !recordKeyField.isEmpty(),
+ "Record key field has to be non-empty!");
+ ValidationUtils.checkArgument(partitionPathField == null || !partitionPathField.isEmpty(),
+ "Partition path field has to be non-empty!");
+
this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField);
this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField);
this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
index 626b3c6ef0..2279e5a13f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.types.DataType
trait SparkParsePartitionUtil extends Serializable {
- def parsePartition(
- path: Path,
- typeInference: Boolean,
- basePaths: Set[Path],
- userSpecifiedDataTypes: Map[String, DataType],
- timeZone: TimeZone): InternalRow
+ def parsePartition(path: Path,
+ typeInference: Boolean,
+ basePaths: Set[Path],
+ userSpecifiedDataTypes: Map[String, DataType],
+ timeZone: TimeZone,
+ validatePartitionValues: Boolean = false): InternalRow
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index ab7d0164ea..eaad4d471e 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -85,7 +85,7 @@ trait SparkAdapter extends Serializable {
/**
* Create the SparkParsePartitionUtil.
*/
- def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil
+ def getSparkParsePartitionUtil: SparkParsePartitionUtil
/**
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
index e0bc22f70d..0561730193 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
@@ -97,7 +97,7 @@ public class TestConsistentBucketIndex extends HoodieClientTestHarness {
initTestDataGenerator(new String[] {""});
}
initFileSystem();
- Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+ Properties props = getPropertiesForKeyGen(populateMetaFields);
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
config = getConfigBuilder()
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 0ce6ca0ee9..0b80d20b39 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.Transformations;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -61,7 +62,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.storage.StorageLevel;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -87,9 +87,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
private HoodieTableMetaClient metaClient;
private HoodieTestDataGenerator dataGen;
- @BeforeEach
- void setUp() throws IOException {
- Properties properties = new Properties();
+ void setUp(Properties props) throws IOException {
+ Properties properties = CollectionUtils.copy(props);
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
dataGen = new HoodieTestDataGenerator();
@@ -99,6 +98,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
+
+ setUp(cfg.getProps());
+
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
String newCommitTime = "001";
@@ -125,6 +127,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
+
+ setUp(cfg.getProps());
+
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
/**
@@ -213,6 +218,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();
+ setUp(config.getProps());
+
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
@@ -302,6 +309,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
.withAutoCommit(false).build();
+ setUp(cfg.getProps());
+
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
@@ -381,6 +390,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
@Test
public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
+
+ setUp(cfg.getProps());
+
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -497,6 +509,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
@Test
public void testHandleUpdateWithMultiplePartitions() throws Exception {
HoodieWriteConfig cfg = getConfig(true);
+
+ setUp(cfg.getProps());
+
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
/**
@@ -578,6 +593,9 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
HoodieWriteConfig.Builder builder = getConfigBuilder(true);
builder.withReleaseResourceEnabled(true);
builder.withAutoCommit(false);
+
+ setUp(builder.build().getProps());
+
/**
* Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
*/
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 3b30c5b767..f959a8f0d9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -54,6 +54,7 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
+import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -98,8 +99,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
- .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
- metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+ .build();
+
+ Properties props = getPropertiesForKeyGen(true);
+ props.putAll(config.getProps());
+
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);
// write data and commit
@@ -138,8 +144,13 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
- .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
- metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+ .build();
+
+ Properties props = getPropertiesForKeyGen(true);
+ props.putAll(config.getProps());
+
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);
final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
index 5df7b4daec..275fd32ca7 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
@@ -82,7 +82,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
public void testIncrementalReadsWithCompaction() throws Exception {
final String partitionPath = "2020/02/20"; // use only one partition for this test
final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
- Properties props = new Properties();
+ Properties props = getPropertiesForKeyGen(true);
props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
HoodieWriteConfig cfg = getConfigBuilder(true).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 2f0e585ec9..0a11425ec5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -155,7 +156,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();
- Properties properties = new Properties();
+ Properties properties = CollectionUtils.copy(cfg.getProps());
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
@@ -327,7 +328,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
- Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+ Properties properties = getPropertiesForKeyGen(populateMetaFields);
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
@@ -606,8 +607,10 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
.withMarkersType(MarkerType.DIRECT.name());
HoodieWriteConfig cfg = cfgBuilder.build();
- Properties properties = new Properties();
+ Properties properties = getPropertiesForKeyGen(true);
+ properties.putAll(cfg.getProps());
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
+
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 80d185f62b..468be5fb26 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -341,8 +341,12 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
}
protected Properties getPropertiesForKeyGen() {
+ return getPropertiesForKeyGen(false);
+ }
+
+ protected Properties getPropertiesForKeyGen(boolean populateMetaFields) {
Properties properties = new Properties();
- properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
+ properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields));
properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index c58dd178dc..ba1afbebb2 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -160,7 +160,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
}
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
- return getHoodieMetaClient(hadoopConf, basePath, new Properties());
+ return getHoodieMetaClient(hadoopConf, basePath, getPropertiesForKeyGen(true));
}
@Override
@@ -310,8 +310,12 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
}
protected Properties getPropertiesForKeyGen() {
+ return getPropertiesForKeyGen(false);
+ }
+
+ protected Properties getPropertiesForKeyGen(boolean populateMetaFields) {
Properties properties = new Properties();
- properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
+ properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields));
properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
@@ -321,9 +325,9 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
}
protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
+ configBuilder.withProperties(getPropertiesForKeyGen(populateMetaFields));
if (!populateMetaFields) {
- configBuilder.withProperties(getPropertiesForKeyGen())
- .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
+ configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 0b2c34618e..5910de5f1d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -18,6 +18,8 @@
package org.apache.hudi;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -34,14 +36,14 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -53,6 +55,8 @@ import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe;
+
/**
* Common (engine-agnostic) File Index implementation enabling individual query engines to
* list Hudi Table contents based on the
@@ -63,13 +67,12 @@ import java.util.stream.Collectors;
* <li>Query instant/range</li>
* </ul>
*/
-public abstract class BaseHoodieTableFileIndex {
+public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
private final String[] partitionColumns;
- private final FileSystemViewStorageConfig fileSystemStorageConfig;
protected final HoodieMetadataConfig metadataConfig;
private final HoodieTableQueryType queryType;
@@ -80,7 +83,7 @@ public abstract class BaseHoodieTableFileIndex {
private final boolean shouldValidateInstant;
private final HoodieTableType tableType;
- protected final String basePath;
+ protected final Path basePath;
private final HoodieTableMetaClient metaClient;
private final HoodieEngineContext engineContext;
@@ -94,6 +97,8 @@ public abstract class BaseHoodieTableFileIndex {
private transient volatile HoodieTableFileSystemView fileSystemView = null;
+ private transient HoodieTableMetadata tableMetadata = null;
+
/**
* @param engineContext Hudi engine-specific context
* @param metaClient Hudi table's meta-client
@@ -117,9 +122,6 @@ public abstract class BaseHoodieTableFileIndex {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElse(new String[0]);
- this.fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder()
- .fromProperties(configProperties)
- .build();
this.metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(configProperties)
.build();
@@ -131,7 +133,7 @@ public abstract class BaseHoodieTableFileIndex {
this.shouldValidateInstant = shouldValidateInstant;
this.tableType = metaClient.getTableType();
- this.basePath = metaClient.getBasePath();
+ this.basePath = metaClient.getBasePathV2();
this.metaClient = metaClient;
this.engineContext = engineContext;
@@ -153,7 +155,7 @@ public abstract class BaseHoodieTableFileIndex {
* Returns table's base-path
*/
public String getBasePath() {
- return metaClient.getBasePath();
+ return basePath.toString();
}
/**
@@ -172,14 +174,19 @@ public abstract class BaseHoodieTableFileIndex {
.mapToInt(List::size).sum();
}
+ @Override
+ public void close() throws Exception {
+ resetTableMetadata(null);
+ }
+
protected List<PartitionPath> getAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
- .map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path))
+ .map(path -> FSUtils.getRelativePartitionPath(basePath, path))
.collect(Collectors.toList());
// Load all the partition path from the basePath, and filter by the query partition path.
// TODO load files from the queryRelativePartitionPaths directly.
- List<String> matchedPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath)
+ List<String> matchedPartitionPaths = getAllPartitionPathsUnchecked()
.stream()
.filter(path -> queryRelativePartitionPaths.stream().anyMatch(path::startsWith))
.collect(Collectors.toList());
@@ -244,12 +251,7 @@ public abstract class BaseHoodieTableFileIndex {
);
fetchedPartitionToFiles =
- FSUtils.getFilesInPartitions(
- engineContext,
- metadataConfig,
- basePath,
- fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
- fileSystemStorageConfig.getSpillableDir())
+ getAllFilesInPartitionsUnchecked(fullPartitionPathsMapToFetch.keySet())
.entrySet()
.stream()
.collect(Collectors.toMap(e -> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
@@ -267,6 +269,11 @@ public abstract class BaseHoodieTableFileIndex {
private void doRefresh() {
long startTime = System.currentTimeMillis();
+ HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(),
+ FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+
+ resetTableMetadata(newTableMetadata);
+
Map<PartitionPath, FileStatus[]> partitionFiles = loadPartitionPathFiles();
FileStatus[] allFiles = partitionFiles.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
@@ -278,7 +285,7 @@ public abstract class BaseHoodieTableFileIndex {
// TODO we can optimize the flow by:
// - First fetch list of files from instants of interest
// - Load FileStatus's
- fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
+ this.fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
Option<String> queryInstant =
specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
@@ -324,6 +331,22 @@ public abstract class BaseHoodieTableFileIndex {
LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
}
+ private Map<String, FileStatus[]> getAllFilesInPartitionsUnchecked(Collection<String> fullPartitionPathsMapToFetch) {
+ try {
+ return tableMetadata.getAllFilesInPartitions(new ArrayList<>(fullPartitionPathsMapToFetch));
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to list partition paths for a table", e);
+ }
+ }
+
+ private List<String> getAllPartitionPathsUnchecked() {
+ try {
+ return isPartitionedTable() ? tableMetadata.getAllPartitionPaths() : Collections.singletonList("");
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to fetch partition paths for a table", e);
+ }
+ }
+
private void validate(HoodieTimeline activeTimeline, Option<String> queryInstant) {
if (shouldValidateInstant) {
if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
@@ -340,7 +363,23 @@ public abstract class BaseHoodieTableFileIndex {
return fileSlice.getBaseFile().map(BaseFile::getFileLen).orElse(0L) + logFileSize;
}
+ private void resetTableMetadata(HoodieTableMetadata newTableMetadata) {
+ if (tableMetadata != null) {
+ try {
+ tableMetadata.close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close HoodieTableMetadata instance", e);
+ }
+ }
+ tableMetadata = newTableMetadata;
+ }
+
+ private boolean isPartitionedTable() {
+ return partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString());
+ }
+
public static final class PartitionPath {
+
final String path;
final Object[] values;
@@ -353,12 +392,14 @@ public abstract class BaseHoodieTableFileIndex {
return path;
}
- Path fullPartitionPath(String basePath) {
+ Path fullPartitionPath(Path basePath) {
if (!path.isEmpty()) {
- return new CachingPath(basePath, path);
+ // NOTE: Since we now that the path is a proper relative path that doesn't require
+ // normalization we create Hadoop's Path using more performant unsafe variant
+ return new CachingPath(basePath, createPathUnsafe(path));
}
- return new CachingPath(basePath);
+ return basePath;
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index d940f3bb45..fe697197f2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidHoodiePathException;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hadoop.conf.Configuration;
@@ -68,6 +69,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.hadoop.CachingPath.getPathWithoutSchemeAndAuthority;
+
/**
* Utility functions related to accessing the file storage.
*/
@@ -216,8 +219,8 @@ public class FSUtils {
* Given a base partition and a partition path, return relative path of partition path to the base path.
*/
public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) {
- basePath = Path.getPathWithoutSchemeAndAuthority(basePath);
- fullPartitionPath = Path.getPathWithoutSchemeAndAuthority(fullPartitionPath);
+ basePath = getPathWithoutSchemeAndAuthority(basePath);
+ fullPartitionPath = getPathWithoutSchemeAndAuthority(fullPartitionPath);
String fullPartitionPathStr = fullPartitionPath.toString();
@@ -607,12 +610,12 @@ public class FSUtils {
String properPartitionPath = partitionPath.startsWith("/")
? partitionPath.substring(1)
: partitionPath;
- return getPartitionPath(new Path(basePath), properPartitionPath);
+ return getPartitionPath(new CachingPath(basePath), properPartitionPath);
}
public static Path getPartitionPath(Path basePath, String partitionPath) {
- // FOr non-partitioned table, return only base-path
- return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new Path(basePath, partitionPath);
+ // For non-partitioned table, return only base-path
+ return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new CachingPath(basePath, partitionPath);
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
index cd35861b74..fe9837e6c6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.model;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
import java.io.Serializable;
import java.util.Objects;
@@ -66,6 +67,14 @@ public class BaseFile implements Serializable {
return fullPath;
}
+ public Path getHadoopPath() {
+ if (fileStatus != null) {
+ return fileStatus.getPath();
+ }
+
+ return new CachingPath(fullPath);
+ }
+
public String getFileName() {
return fileName;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 7ed70fe0f1..caab37162c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -399,7 +399,7 @@ public class HoodieTableMetaClient implements Serializable {
public void validateTableProperties(Properties properties) {
// Once meta fields are disabled, it cant be re-enabled for a given table.
if (!getTableConfig().populateMetaFields()
- && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) {
+ && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString()))) {
throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 72cb3a0ef3..d923c59270 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,19 +18,6 @@
package org.apache.hudi.common.table.log.block;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.inline.InLineFSUtils;
-import org.apache.hudi.common.fs.inline.InLineFileSystem;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
-import org.apache.hudi.io.storage.HoodieHFileReader;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +30,16 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
+import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -167,9 +164,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- FileSystem fs = FSUtils.getFs(pathForReader.toString(), new Configuration());
// Read the content
- HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(fs, pathForReader, content, Option.of(writerSchema));
+ HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(null, pathForReader, content, Option.of(writerSchema));
Iterator<IndexedRecord> recordIterator = reader.getRecordIterator(readerSchema);
return new ClosableIterator<IndexedRecord>() {
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index dc6fc47b58..5818636cae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -95,8 +95,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
private BootstrapIndex bootstrapIndex;
- private String getPartitionPathFromFilePath(String fullPath) {
- return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent());
+ private String getPartitionPathFor(HoodieBaseFile baseFile) {
+ return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), baseFile.getHadoopPath().getParent());
}
/**
@@ -166,8 +166,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile> baseFileStream,
Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
Map<Pair<String, String>, List<HoodieBaseFile>> baseFiles =
- baseFileStream.collect(Collectors.groupingBy((baseFile) -> {
- String partitionPathStr = getPartitionPathFromFilePath(baseFile.getPath());
+ baseFileStream.collect(Collectors.groupingBy(baseFile -> {
+ String partitionPathStr = getPartitionPathFor(baseFile);
return Pair.of(partitionPathStr, baseFile.getFileId());
}));
@@ -183,7 +183,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
List<HoodieFileGroup> fileGroups = new ArrayList<>();
fileIdSet.forEach(pair -> {
String fileId = pair.getValue();
- HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
+ String partitionPath = pair.getKey();
+ HoodieFileGroup group = new HoodieFileGroup(partitionPath, fileId, timeline);
if (baseFiles.containsKey(pair)) {
baseFiles.get(pair).forEach(group::addBaseFile);
}
@@ -373,7 +374,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
* @param baseFile base File
*/
protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) {
- final String partitionPath = getPartitionPathFromFilePath(baseFile.getPath());
+ final String partitionPath = getPartitionPathFor(baseFile);
Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
getPendingCompactionOperationWithInstant(new HoodieFileGroupId(partitionPath, baseFile.getFileId()));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index 8036995fab..194d67cd0e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -54,6 +54,15 @@ public class CollectionUtils {
return !isNullOrEmpty(c);
}
+ /**
+ * Makes a copy of provided {@link Properties} object
+ */
+ public static Properties copy(Properties props) {
+ Properties copy = new Properties();
+ copy.putAll(props);
+ return copy;
+ }
+
/**
* Returns last element of the array of {@code T}
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java
index 01b3eb9d40..d6e35dbbdc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java
+++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java
@@ -19,10 +19,11 @@
package org.apache.hudi.hadoop;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.exception.HoodieException;
import javax.annotation.concurrent.ThreadSafe;
-import java.io.Serializable;
import java.net.URI;
+import java.net.URISyntaxException;
/**
* This is an extension of the {@code Path} class allowing to avoid repetitive
@@ -32,11 +33,12 @@ import java.net.URI;
* NOTE: This class is thread-safe
*/
@ThreadSafe
-public class CachingPath extends Path implements Serializable {
+public class CachingPath extends Path {
// NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all
// reads/writes to references are always atomic (including 64-bit JVMs)
// https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7
+ private volatile Path parent;
private volatile String fileName;
private volatile String fullPathStr;
@@ -74,6 +76,17 @@ public class CachingPath extends Path implements Serializable {
return fileName;
}
+ @Override
+ public Path getParent() {
+ // This value could be overwritten concurrently and that's okay, since
+ // {@code Path} is immutable
+ if (parent == null) {
+ parent = super.getParent();
+ }
+
+ return parent;
+ }
+
@Override
public String toString() {
// This value could be overwritten concurrently and that's okay, since
@@ -83,4 +96,46 @@ public class CachingPath extends Path implements Serializable {
}
return fullPathStr;
}
+
+ public CachingPath subPath(String relativePath) {
+ return new CachingPath(this, createPathUnsafe(relativePath));
+ }
+
+ public static CachingPath wrap(Path path) {
+ if (path instanceof CachingPath) {
+ return (CachingPath) path;
+ }
+
+ return new CachingPath(path.toUri());
+ }
+
+ /**
+ * Creates path based on the provided *relative* path
+ *
+ * NOTE: This is an unsafe version that is relying on the fact that the caller is aware
+ * what they are doing this is not going to work with paths having scheme (which require
+ * parsing) and is only meant to work w/ relative paths in a few specific cases.
+ */
+ public static CachingPath createPathUnsafe(String relativePath) {
+ try {
+ // NOTE: {@code normalize} is going to be invoked by {@code Path} ctor, so there's no
+ // point in invoking it here
+ URI uri = new URI(null, null, relativePath, null, null);
+ return new CachingPath(uri);
+ } catch (URISyntaxException e) {
+ throw new HoodieException("Failed to instantiate relative path", e);
+ }
+ }
+
+ /**
+ * This is {@link Path#getPathWithoutSchemeAndAuthority(Path)} counterpart, instantiating
+ * {@link CachingPath}
+ */
+ public static Path getPathWithoutSchemeAndAuthority(Path path) {
+ // This code depends on Path.toString() to remove the leading slash before
+ // the drive specification on Windows.
+ return path.isUriPathAbsolute()
+ ? createPathUnsafe(path.toUri().getPath())
+ : path;
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java
index 5ad2307ef8..796600a7e8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java
+++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.net.URI;
import java.util.Objects;
/**
@@ -42,12 +43,12 @@ public class SerializablePath implements Serializable {
}
private void writeObject(ObjectOutputStream out) throws IOException {
- out.writeUTF(path.toString());
+ out.writeObject(path.toUri());
}
- private void readObject(ObjectInputStream in) throws IOException {
- String pathStr = in.readUTF();
- path = new CachingPath(pathStr);
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ URI uri = (URI) in.readObject();
+ path = new CachingPath(uri);
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
index 3767ea1832..878a3c563b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java
@@ -67,7 +67,9 @@ public class HoodieHFileUtils {
*/
public static HFile.Reader createHFileReader(
FileSystem fs, Path dummyPath, byte[] content) throws IOException {
- Configuration conf = new Configuration();
+ // Avoid loading default configs, from the FS, since this configuration is mostly
+ // used as a stub to initialize HFile reader
+ Configuration conf = new Configuration(false);
HoodieHFileReader.SeekableByteArrayInputStream bis = new HoodieHFileReader.SeekableByteArrayInputStream(content);
FSDataInputStream fsdis = new FSDataInputStream(bis);
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fsdis);
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 2036500ac6..37a209b0a8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -19,6 +19,7 @@
package org.apache.hudi.metadata;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
@@ -39,10 +40,13 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.FileIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.hadoop.SerializablePath;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -55,8 +59,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
+import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class BaseTableMetadata implements HoodieTableMetadata {
@@ -68,7 +74,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
protected final transient HoodieEngineContext engineContext;
protected final SerializableConfiguration hadoopConf;
- protected final String dataBasePath;
+ protected final SerializablePath dataBasePath;
protected final HoodieTableMetaClient dataMetaClient;
protected final Option<HoodieMetadataMetrics> metrics;
protected final HoodieMetadataConfig metadataConfig;
@@ -83,7 +89,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
String dataBasePath, String spillableMapDirectory) {
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf());
- this.dataBasePath = dataBasePath;
+ this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build();
this.spillableMapDirectory = spillableMapDirectory;
this.metadataConfig = metadataConfig;
@@ -113,7 +119,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e);
}
}
- return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath,
+ return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(),
metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths();
}
@@ -138,13 +144,17 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
}
- return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
+ return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
.getAllFilesInPartition(partitionPath);
}
@Override
public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitions)
throws IOException {
+ if (partitions.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
if (isMetadataTableEnabled) {
try {
List<Path> partitionPaths = partitions.stream().map(Path::new).collect(Collectors.toList());
@@ -154,7 +164,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
}
- return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
+ return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
.getAllFilesInPartitions(partitions);
}
@@ -278,20 +288,23 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
*/
protected List<String> fetchAllPartitionPaths() {
HoodieTimer timer = new HoodieTimer().startTimer();
- Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST,
+ Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = getRecordByKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
- List<String> partitions = Collections.emptyList();
- if (hoodieRecord.isPresent()) {
- handleSpuriousDeletes(hoodieRecord, "\"all partitions\"");
- partitions = hoodieRecord.get().getData().getFilenames();
- // Partition-less tables have a single empty partition
- if (partitions.contains(NON_PARTITIONED_NAME)) {
- partitions.remove(NON_PARTITIONED_NAME);
- partitions.add("");
+ List<String> partitions = recordOpt.map(record -> {
+ HoodieMetadataPayload metadataPayload = record.getData();
+ checkForSpuriousDeletes(metadataPayload, "\"all partitions\"");
+
+ List<String> relativePaths = metadataPayload.getFilenames();
+ // Non-partitioned tables have a single empty partition
+ if (relativePaths.size() == 1 && relativePaths.get(0).equals(NON_PARTITIONED_NAME)) {
+ return Collections.singletonList("");
+ } else {
+ return relativePaths;
}
- }
+ })
+ .orElse(Collections.emptyList());
LOG.info("Listed partitions from metadata: #partitions=" + partitions.size());
return partitions;
@@ -303,75 +316,81 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
* @param partitionPath The absolute path of the partition
*/
FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException {
- String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath);
- if (partitionName.isEmpty()) {
- partitionName = NON_PARTITIONED_NAME;
- }
+ String relativePartitionPath = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath);
+ String recordKey = relativePartitionPath.isEmpty() ? NON_PARTITIONED_NAME : relativePartitionPath;
HoodieTimer timer = new HoodieTimer().startTimer();
- Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKey(partitionName,
+ Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = getRecordByKey(recordKey,
MetadataPartitionType.FILES.getPartitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
- FileStatus[] statuses = {};
- if (hoodieRecord.isPresent()) {
- handleSpuriousDeletes(hoodieRecord, partitionName);
- statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath);
- }
+ FileStatus[] statuses = recordOpt.map(record -> {
+ HoodieMetadataPayload metadataPayload = record.getData();
+ checkForSpuriousDeletes(metadataPayload, recordKey);
+ try {
+ return metadataPayload.getFileStatuses(hadoopConf.get(), partitionPath);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to extract file-statuses from the payload", e);
+ }
+ })
+ .orElse(new FileStatus[0]);
- LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length);
+ LOG.info("Listed file in partition from metadata: partition=" + relativePartitionPath + ", #files=" + statuses.length);
return statuses;
}
Map<String, FileStatus[]> fetchAllFilesInPartitionPaths(List<Path> partitionPaths) throws IOException {
- Map<String, Path> partitionInfo = new HashMap<>();
- boolean foundNonPartitionedPath = false;
- for (Path partitionPath: partitionPaths) {
- String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath);
- if (partitionName.isEmpty()) {
- if (partitionInfo.size() > 1) {
- throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table");
- }
- partitionInfo.put(NON_PARTITIONED_NAME, partitionPath);
- foundNonPartitionedPath = true;
- } else {
- if (foundNonPartitionedPath) {
- throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table");
- }
- partitionInfo.put(partitionName, partitionPath);
- }
- }
+ Map<String, Path> partitionIdToPathMap =
+ partitionPaths.parallelStream()
+ .collect(
+ Collectors.toMap(partitionPath -> {
+ String partitionId = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath);
+ return partitionId.isEmpty() ? NON_PARTITIONED_NAME : partitionId;
+ }, Function.identity())
+ );
HoodieTimer timer = new HoodieTimer().startTimer();
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> partitionsFileStatus =
- getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()), MetadataPartitionType.FILES.getPartitionPath());
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> partitionIdRecordPairs =
+ getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
- Map<String, FileStatus[]> result = new HashMap<>();
- for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry: partitionsFileStatus) {
- if (entry.getValue().isPresent()) {
- handleSpuriousDeletes(entry.getValue(), entry.getKey());
- result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey())));
- }
- }
+ FileSystem fs = partitionPaths.get(0).getFileSystem(hadoopConf.get());
+
+ Map<String, FileStatus[]> partitionPathToFilesMap = partitionIdRecordPairs.parallelStream()
+ .map(pair -> {
+ String partitionId = pair.getKey();
+ Option<HoodieRecord<HoodieMetadataPayload>> recordOpt = pair.getValue();
+
+ Path partitionPath = partitionIdToPathMap.get(partitionId);
+
+ return recordOpt.map(record -> {
+ HoodieMetadataPayload metadataPayload = record.getData();
+ checkForSpuriousDeletes(metadataPayload, partitionId);
+
+ FileStatus[] files = metadataPayload.getFileStatuses(fs, partitionPath);
+ return Pair.of(partitionPath.toString(), files);
+ })
+ .orElse(null);
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
LOG.info("Listed files in partitions from metadata: partition list =" + Arrays.toString(partitionPaths.toArray()));
- return result;
+
+ return partitionPathToFilesMap;
}
/**
* Handle spurious deletes. Depending on config, throw an exception or log a warn msg.
- * @param hoodieRecord instance of {@link HoodieRecord} of interest.
- * @param partitionName partition name of interest.
*/
- private void handleSpuriousDeletes(Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord, String partitionName) {
- if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+ private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, String partitionName) {
+ if (!metadataPayload.getDeletions().isEmpty()) {
if (metadataConfig.ignoreSpuriousDeletes()) {
LOG.warn("Metadata record for " + partitionName + " encountered some files to be deleted which was not added before. "
+ "Ignoring the spurious deletes as the `" + HoodieMetadataConfig.IGNORE_SPURIOUS_DELETES.key() + "` config is set to true");
} else {
throw new HoodieMetadataException("Metadata record for " + partitionName + " is inconsistent: "
- + hoodieRecord.get().getData());
+ + metadataPayload);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index f8a0389da3..d4865875b1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -111,7 +111,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
private void initIfNeeded() {
- this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath);
+ this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString());
if (!isMetadataTableEnabled) {
if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) {
LOG.info("Metadata table is disabled.");
@@ -303,8 +303,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
}
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
-
HoodieTimer readTimer = new HoodieTimer();
readTimer.startTimer();
@@ -413,7 +411,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// Open the log record scanner using the log files from the latest file slice
List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
- getLogRecordScanner(logFiles, partitionName);
+ getLogRecordScanner(logFiles, partitionName, Option.empty());
HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
@@ -465,11 +463,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return validInstantTimestamps;
}
- public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
- String partitionName) {
- return getLogRecordScanner(logFiles, partitionName, Option.empty());
- }
-
public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
String partitionName,
Option<Boolean> allowFullScanOverride) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index df138cd124..0575177800 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.FileIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.util.Lazy;
@@ -80,6 +81,7 @@ import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe;
import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
@@ -473,10 +475,22 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
*/
public FileStatus[] getFileStatuses(Configuration hadoopConf, Path partitionPath) throws IOException {
FileSystem fs = partitionPath.getFileSystem(hadoopConf);
+ return getFileStatuses(fs, partitionPath);
+ }
+
+ /**
+ * Returns the files added as part of this record.
+ */
+ public FileStatus[] getFileStatuses(FileSystem fs, Path partitionPath) {
long blockSize = fs.getDefaultBlockSize(partitionPath);
return filterFileInfoEntries(false)
- .map(e -> new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0,
- null, null, null, new Path(partitionPath, e.getKey())))
+ .map(e -> {
+ // NOTE: Since we know that the Metadata Table's Payload is simply a file-name we're
+ // creating Hadoop's Path using more performant unsafe variant
+ CachingPath filePath = new CachingPath(partitionPath, createPathUnsafe(e.getKey()));
+ return new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0,
+ null, null, null, filePath);
+ })
.toArray(FileStatus[]::new);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index ae871e3be0..349c0efb48 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
-
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieMetadataException;
@@ -107,13 +106,27 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath,
String spillableMapPath, boolean reuse) {
if (metadataConfig.enabled()) {
- return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
+ return createHoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
} else {
- return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
- datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());
+ return createFSBackedTableMetadata(engineContext, metadataConfig, datasetBasePath);
}
}
+ static FileSystemBackedTableMetadata createFSBackedTableMetadata(HoodieEngineContext engineContext,
+ HoodieMetadataConfig metadataConfig,
+ String datasetBasePath) {
+ return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
+ datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());
+ }
+
+ static HoodieBackedTableMetadata createHoodieBackedTableMetadata(HoodieEngineContext engineContext,
+ HoodieMetadataConfig metadataConfig,
+ String datasetBasePath,
+ String spillableMapPath,
+ boolean reuse) {
+ return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
+ }
+
/**
* Fetch all the files at the given partition path, per the latest snapshot of the metadata.
*/
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 6a2bffd34d..d3c1de5677 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -40,6 +40,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
@@ -124,28 +125,28 @@ public class HoodieTestUtils {
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
- Properties properties)
- throws IOException {
- properties = HoodieTableMetaClient.withPropertyBuilder()
- .setTableName(RAW_TRIPS_TEST_NAME)
- .setTableType(tableType)
- .setPayloadClass(HoodieAvroPayload.class)
- .fromProperties(properties)
- .build();
- return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
+ Properties properties) throws IOException {
+ return init(hadoopConf, basePath, tableType, properties, null);
}
public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
Properties properties, String databaseName)
throws IOException {
- properties = HoodieTableMetaClient.withPropertyBuilder()
- .setDatabaseName(databaseName)
- .setTableName(RAW_TRIPS_TEST_NAME)
- .setTableType(tableType)
- .setPayloadClass(HoodieAvroPayload.class)
- .fromProperties(properties)
- .build();
- return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties);
+ HoodieTableMetaClient.PropertyBuilder builder =
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setDatabaseName(databaseName)
+ .setTableName(RAW_TRIPS_TEST_NAME)
+ .setTableType(tableType)
+ .setPayloadClass(HoodieAvroPayload.class);
+
+ String keyGen = properties.getProperty("hoodie.datasource.write.keygenerator.class");
+ if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")) {
+ builder.setPartitionFields("some_nonexistent_field");
+ }
+
+ Properties processedProperties = builder.fromProperties(properties).build();
+
+ return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, processedProperties);
}
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException {
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index db8002cd2d..a4471845c3 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -86,7 +86,10 @@ public class InputFormatTestUtil {
baseFileFormat);
}
- java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
+ java.nio.file.Path partitionPath = useNonPartitionedKeyGen
+ ? basePath
+ : basePath.resolve(Paths.get("2016", "05", "01"));
+
setupPartition(basePath, partitionPath);
if (injectData) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 4e70ebad75..a9a38f5f82 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -20,12 +20,15 @@ package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, shouldValidatePartitionColumns}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.hadoop.CachingPath
+import org.apache.hudi.hadoop.CachingPath.createPathUnsafe
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
@@ -39,6 +42,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
+import scala.language.implicitConversions
/**
* Implementation of the [[BaseHoodieTableFileIndex]] for Spark
@@ -79,7 +83,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
- private lazy val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark.sessionState.conf)
+ private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil
/**
* Get the partition schema from the hoodie.properties.
@@ -254,7 +258,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
}
}.mkString("/")
- val pathWithPartitionName = new Path(basePath, partitionWithName)
+ val pathWithPartitionName = new CachingPath(basePath, createPathUnsafe(partitionWithName))
val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema)
partitionValues.map(_.asInstanceOf[Object]).toArray
@@ -269,22 +273,17 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
sparkParsePartitionUtil.parsePartition(
partitionPath,
typeInference = false,
- Set(new Path(basePath)),
+ Set(basePath),
partitionDataTypes,
- DateTimeUtils.getTimeZone(timeZoneId)
+ DateTimeUtils.getTimeZone(timeZoneId),
+ validatePartitionValues = shouldValidatePartitionColumns(spark)
)
.toSeq(partitionSchema)
}
}
-object SparkHoodieTableFileIndex {
- implicit def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
- if (opt.isDefined) {
- org.apache.hudi.common.util.Option.of(opt.get)
- } else {
- org.apache.hudi.common.util.Option.empty()
- }
+object SparkHoodieTableFileIndex {
/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
@@ -341,4 +340,9 @@ object SparkHoodieTableFileIndex {
override def invalidate(): Unit = cache.invalidateAll()
}
}
+
+ private def shouldValidatePartitionColumns(spark: SparkSession): Boolean = {
+ // NOTE: We can't use helper, method nor the config-entry to stay compatible w/ Spark 2.4
+ spark.sessionState.conf.getConfString("spark.sql.sources.validatePartitionColumns", "true").toBoolean
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index 890f8a9019..5a71f0e371 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -237,7 +237,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "")
+ .option(PARTITIONPATH_FIELD.key, "name")
.mode(SaveMode.Overwrite)
.save(basePath)
@@ -252,7 +252,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "")
+ .option(PARTITIONPATH_FIELD.key, "name")
.mode(SaveMode.Append)
.save(basePath)
metaClient.reloadActiveTimeline()
@@ -263,7 +263,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df3.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "")
+ .option(PARTITIONPATH_FIELD.key, "name")
.mode(SaveMode.Append)
.save(basePath)
metaClient.reloadActiveTimeline()
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 7cd7271c6b..91ab71ef1c 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -67,7 +67,7 @@ class Spark2Adapter extends SparkAdapter {
)
}
- override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = new Spark2ParsePartitionUtil
+ override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark2ParsePartitionUtil
override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2")
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
index c3cbcc4075..fe0bf50e69 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
@@ -24,14 +24,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.InternalRow
-class Spark2ParsePartitionUtil extends SparkParsePartitionUtil {
+object Spark2ParsePartitionUtil extends SparkParsePartitionUtil {
- override def parsePartition(
- path: Path,
- typeInference: Boolean,
- basePaths: Set[Path],
- userSpecifiedDataTypes: Map[String, DataType],
- timeZone: TimeZone): InternalRow = {
+ override def parsePartition(path: Path,
+ typeInference: Boolean,
+ basePaths: Set[Path],
+ userSpecifiedDataTypes: Map[String, DataType],
+ timeZone: TimeZone,
+ validatePartitionValues: Boolean = false): InternalRow = {
val (partitionValues, _) = PartitioningUtils.parsePartition(path, typeInference,
basePaths, userSpecifiedDataTypes, timeZone)
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java
index 1157a68254..d7a9a1f122 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java
@@ -53,12 +53,12 @@ public class ReflectUtil {
try {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
if (HoodieSparkUtils.gteqSpark3_2()) {
- Class clazz = loader.loadClass(DateFormatter.class.getName());
+ Class<?> clazz = loader.loadClass(DateFormatter.class.getName());
Method applyMethod = clazz.getDeclaredMethod("apply");
applyMethod.setAccessible(true);
return (DateFormatter)applyMethod.invoke(null);
} else {
- Class clazz = loader.loadClass(DateFormatter.class.getName());
+ Class<?> clazz = loader.loadClass(DateFormatter.class.getName());
Method applyMethod = clazz.getDeclaredMethod("apply", ZoneId.class);
applyMethod.setAccessible(true);
return (DateFormatter)applyMethod.invoke(null, zoneId);
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 5ba976d362..77df665b98 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -50,9 +50,7 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters
- override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = {
- new Spark3ParsePartitionUtil(conf)
- }
+ override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark3ParsePartitionUtil
override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = {
parser.parseMultipartIdentifier(sqlText)
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
index f0cbe0530f..ebe92a5a32 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
@@ -17,57 +17,63 @@
package org.apache.spark.sql.execution.datasources
-import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong}
-import java.math.{BigDecimal => JBigDecimal}
-import java.time.ZoneId
-import java.util.{Locale, TimeZone}
-
import org.apache.hadoop.fs.Path
-
import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
import org.apache.hudi.spark3.internal.ReflectUtil
-
+import org.apache.hudi.util.JFunction
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.execution.datasources.PartitioningUtils.timestampPartitionPattern
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong}
+import java.math.{BigDecimal => JBigDecimal}
+import java.time.ZoneId
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{Locale, TimeZone}
+import scala.collection.convert.Wrappers.JConcurrentMapWrapper
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import scala.util.control.NonFatal
-class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
+object Spark3ParsePartitionUtil extends SparkParsePartitionUtil {
+
+ private val cache = JConcurrentMapWrapper(
+ new ConcurrentHashMap[ZoneId, (DateFormatter, TimestampFormatter)](1))
/**
* The definition of PartitionValues has been changed by SPARK-34314 in Spark3.2.
* To solve the compatibility between 3.1 and 3.2, copy some codes from PartitioningUtils in Spark3.2 here.
* And this method will generate and return `InternalRow` directly instead of `PartitionValues`.
*/
- override def parsePartition(
- path: Path,
- typeInference: Boolean,
- basePaths: Set[Path],
- userSpecifiedDataTypes: Map[String, DataType],
- timeZone: TimeZone): InternalRow = {
- val dateFormatter = ReflectUtil.getDateFormatter(timeZone.toZoneId)
- val timestampFormatter = TimestampFormatter(timestampPartitionPattern,
- timeZone.toZoneId, isParsing = true)
+ override def parsePartition(path: Path,
+ typeInference: Boolean,
+ basePaths: Set[Path],
+ userSpecifiedDataTypes: Map[String, DataType],
+ tz: TimeZone,
+ validatePartitionValues: Boolean = false): InternalRow = {
+ val (dateFormatter, timestampFormatter) = cache.getOrElseUpdate(tz.toZoneId, {
+ val dateFormatter = ReflectUtil.getDateFormatter(tz.toZoneId)
+ val timestampFormatter = TimestampFormatter(timestampPartitionPattern, tz.toZoneId, isParsing = true)
+
+ (dateFormatter, timestampFormatter)
+ })
val (partitionValues, _) = parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
- conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter)
+ validatePartitionValues, tz.toZoneId, dateFormatter, timestampFormatter)
partitionValues.map {
case PartitionValues(columnNames: Seq[String], typedValues: Seq[TypedPartValue]) =>
val rowValues = columnNames.zip(typedValues).map { case (columnName, typedValue) =>
try {
- castPartValueToDesiredType(typedValue.dataType, typedValue.value, timeZone.toZoneId)
+ castPartValueToDesiredType(typedValue.dataType, typedValue.value, tz.toZoneId)
} catch {
case NonFatal(_) =>
- if (conf.validatePartitionColumns) {
+ if (validatePartitionValues) {
throw new RuntimeException(s"Failed to cast value `${typedValue.value}` to " +
s"`${typedValue.dataType}` for partition column `$columnName`")
} else null
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 741ff8010a..d94ff1477a 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1677,7 +1677,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
- props.setProperty("hoodie.datasource.write.partitionpath.field", "");
+ props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType);
@@ -1701,7 +1701,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
- PARQUET_SOURCE_ROOT, false, "");
+ PARQUET_SOURCE_ROOT, false, "driver");
// delta streamer w/ parquet source
String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
@@ -1828,8 +1828,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
private void prepareCsvDFSSource(
boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
String sourceRoot = dfsBasePath + "/csvFiles";
- String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0";
- String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : "";
+ String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c1";
+ String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : "_c2";
// Properties used for testing delta-streamer with CSV source
TypedProperties csvProps = new TypedProperties();
@@ -2101,7 +2101,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
@Test
public void testDeletePartitions() throws Exception {
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
- PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "");
+ PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path");
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index df790cf115..b08438c5a7 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -143,9 +143,9 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
if (expectedCount == 0) {
assertFalse(batchCheckPoint.getKey().isPresent());
} else {
- assertEquals(batchCheckPoint.getKey().get().count(), expectedCount);
+ assertEquals(expectedCount, batchCheckPoint.getKey().get().count());
}
- Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint);
+ Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight());
}
private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords, String commit) throws IOException {