You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/04/05 23:14:45 UTC
[hudi] branch master updated: [HUDI-5782] Tweak defaults and remove unnecessary configs after config review (#8128)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8906b0dfeea [HUDI-5782] Tweak defaults and remove unnecessary configs after config review (#8128)
8906b0dfeea is described below
commit 8906b0dfeea3decfbfd6c0645c67fac729c24cbb
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Thu Apr 6 04:44:36 2023 +0530
[HUDI-5782] Tweak defaults and remove unnecessary configs after config review (#8128)
This commit tweaks a few defaults and removes unnecessary configs after config review:
"hoodie.write.lock.dynamodb.table": add default value of hudi_locks
"hoodie.write.lock.dynamodb.table_creation_timeout": default changed from 10 minutes to 2 minutes
"hoodie.clustering.preserve.commit.metadata": removed since this should always be true
"hoodie.compaction.preserve.commit.metadata": removed since this should always be true
"hoodie.write.lock.max_wait_time_ms_between_retry": default changed from 5s to 16s
"hoodie.write.commit.callback.http.timeout.seconds": default changed from 3s to 30s
Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
.../hudi/config/DynamoDbBasedLockConfig.java | 4 +--
.../apache/hudi/config/HoodieClusteringConfig.java | 11 -------
.../apache/hudi/config/HoodieCompactionConfig.java | 11 -------
.../org/apache/hudi/config/HoodieLockConfig.java | 2 +-
.../config/HoodieWriteCommitCallbackConfig.java | 4 +--
.../org/apache/hudi/config/HoodieWriteConfig.java | 9 +-----
.../org/apache/hudi/io/HoodieCreateHandle.java | 4 ++-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 4 ++-
.../metadata/HoodieBackedTableMetadataWriter.java | 3 --
.../PartitionAwareClusteringPlanStrategy.java | 2 +-
.../hudi/client/TestUpdateSchemaEvolution.java | 13 ++++----
.../TestHoodieClientOnCopyOnWriteStorage.java | 35 ++++++++--------------
.../hudi/client/functional/TestHoodieIndex.java | 2 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 9 +++---
...HoodieSparkMergeOnReadTableIncrementalRead.java | 4 +--
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 4 +--
.../SparkClientFunctionalTestHarness.java | 8 ++---
.../apache/hudi/common/util/ClusteringUtils.java | 1 +
.../apache/hudi/configuration/FlinkOptions.java | 12 ++------
.../hudi/sink/clustering/ClusteringOperator.java | 25 +++++-----------
.../src/test/java/HoodieJavaStreamingApp.java | 6 ++--
.../org/apache/hudi/functional/TestBootstrap.java | 4 +--
.../TestHoodieSparkMergeOnReadTableClustering.java | 28 +++++++----------
.../apache/hudi/functional/TestOrcBootstrap.java | 2 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 13 ++------
25 files changed, 73 insertions(+), 147 deletions(-)
diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
index 15e81bc90e3..4530340537b 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -46,7 +46,7 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
public static final ConfigProperty<String> DYNAMODB_LOCK_TABLE_NAME = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table")
- .noDefaultValue()
+ .defaultValue("hudi_locks")
.sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, the name of the DynamoDB table acting as lock table");
@@ -98,7 +98,7 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
public static final ConfigProperty<String> DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table_creation_timeout")
- .defaultValue(String.valueOf(10 * 60 * 1000))
+ .defaultValue(String.valueOf(2 * 60 * 1000))
.sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, the maximum number of milliseconds to wait for creating DynamoDB table");
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index e04ce341e46..124d2444705 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -218,12 +218,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
.withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table.")
.withAlternatives("hoodie.datasource.clustering.async.enable");
- public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
- .key("hoodie.clustering.preserve.commit.metadata")
- .defaultValue(true)
- .sinceVersion("0.9.0")
- .withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
-
/**
* @deprecated this setting has no effect. Please refer to clustering configuration, as well as
* {@link #LAYOUT_OPTIMIZE_STRATEGY} config to enable advanced record layout optimization strategies
@@ -581,11 +575,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
- public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMetadata) {
- clusteringConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveHoodieCommitMetadata));
- return this;
- }
-
public Builder withRollbackPendingClustering(Boolean rollbackPendingClustering) {
clusteringConfig.setValue(ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT, String.valueOf(rollbackPendingClustering));
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index e37ff3c46bf..02d86463d80 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -151,12 +151,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of "
+ "latest partitions to compact during a compaction run.");
- public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
- .key("hoodie.compaction.preserve.commit.metadata")
- .defaultValue(true)
- .sinceVersion("0.11.0")
- .withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
-
/**
* Configs related to specific table types.
*/
@@ -423,11 +417,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
- compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata));
- return this;
- }
-
public Builder withLogCompactionBlocksThreshold(String logCompactionBlocksThreshold) {
compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, logCompactionBlocksThreshold);
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index f91c5791a03..73ab98f7067 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -76,7 +76,7 @@ public class HoodieLockConfig extends HoodieConfig {
public static final ConfigProperty<String> LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS = ConfigProperty
.key(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY)
- .defaultValue(String.valueOf(5000L))
+ .defaultValue(String.valueOf(16000L))
.sinceVersion("0.8.0")
.withDocumentation("Maximum amount of time to wait between retries by lock provider client. This bounds"
+ " the maximum delay from the exponential backoff. Currently used by ZK based lock provider only.");
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
index 8be57c05d1c..d9e0193dc72 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
@@ -67,9 +67,9 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig {
public static final ConfigProperty<Integer> CALLBACK_HTTP_TIMEOUT_IN_SECONDS = ConfigProperty
.key(CALLBACK_PREFIX + "http.timeout.seconds")
- .defaultValue(3)
+ .defaultValue(30)
.sinceVersion("0.6.0")
- .withDocumentation("Callback timeout in seconds. 3 by default");
+ .withDocumentation("Callback timeout in seconds.");
/**
* @deprecated Use {@link #TURN_CALLBACK_ON} and its methods instead
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c83792c22de..51539f585d9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1537,14 +1537,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
}
- public boolean isPreserveHoodieCommitMetadataForClustering() {
- return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA);
- }
-
- public boolean isPreserveHoodieCommitMetadataForCompaction() {
- return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA);
- }
-
public boolean isClusteringEnabled() {
// TODO: future support async clustering
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
@@ -2308,6 +2300,7 @@ public class HoodieWriteConfig extends HoodieConfig {
/**
* Hoodie Client Lock Configs.
+ *
* @return
*/
public boolean isAutoAdjustLockConfigs() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 23edb15a780..8b20df3f1a5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -35,6 +35,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -115,7 +116,8 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
TaskContextSupplier taskContextSupplier) {
- this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
+ // preserveMetadata is disabled by default for MDT but enabled otherwise
+ this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, !HoodieTableMetadata.isMetadataTable(config.getBasePath()));
this.recordMap = recordMap;
this.useWriterSchema = true;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4202e2c590b..073a0e0aad1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -47,6 +47,7 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -140,7 +141,8 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchemaForCompaction = true;
- this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
+ // preserveMetadata is disabled by default for MDT but enabled otherwise
+ this.preserveMetadata = !HoodieTableMetadata.isMetadataTable(config.getBasePath());
init(fileId, this.partitionPath, dataFileToBeMerged);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index e5065baeb82..9ba192d4a42 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -291,9 +291,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
- // by default, the HFile does not keep the metadata fields, set up as false
- // to always use the metadata of the new record.
- .withPreserveCommitMetadata(false)
.withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
.build())
.withParallelism(parallelism, parallelism)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 8c0bd0cab1f..5b6ee9075bd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -122,7 +122,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
- .setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadataForClustering())
+ .setPreserveHoodieMetadata(true)
.build());
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 8e3252a8462..12106e011fd 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -30,11 +30,11 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
@@ -80,7 +80,6 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness implement
private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
// Create a bunch of records with an old version of schema
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc");
- config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
List<HoodieRecord> insertRecords = new ArrayList<>();
@@ -91,9 +90,11 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness implement
}
Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
.collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity()));
- HoodieCreateHandle<?,?,?,?> createHandle =
- new HoodieCreateHandle(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
- createHandle.write();
+ HoodieWriteHandle<?,?,?,?> createHandle = new CreateHandleFactory<>(false)
+ .create(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", supplier);
+ for (HoodieRecord record : insertRecordMap.values()) {
+ createHandle.write(record, createHandle.getWriterSchemaWithMetaFields(), createHandle.getConfig().getProps());
+ }
return createHandle.close().get(0);
}).collect();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 7f552b6da2d..222816ff772 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -199,15 +199,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
}
- private static Stream<Arguments> populateMetaFieldsAndPreserveMetadataParams() {
- return Arrays.stream(new Boolean[][] {
- {true, true},
- {false, true},
- {true, false},
- {false, false}
- }).map(Arguments::of);
- }
-
private static Stream<Arguments> rollbackFailedCommitsParams() {
return Stream.of(
Arguments.of(HoodieFailedWritesCleaningPolicy.LAZY, true),
@@ -1040,8 +1031,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
- .withBloomIndexUpdatePartitionPath(true)
- .withGlobalSimpleIndexUpdatePartitionPath(true)
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
@@ -1518,12 +1507,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
@ParameterizedTest
- @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
- public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+ @MethodSource("populateMetaFieldsParams")
+ public void testSimpleClustering(boolean populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+ .build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@@ -1567,7 +1556,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
- .withPreserveHoodieCommitMetadata(true).build();
+ .build();
// trigger clustering, but do not complete
testInsertAndClustering(clusteringConfig, true, false, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
@@ -1606,7 +1595,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(false).withScheduleInlineClustering(scheduleInlineClustering)
- .withPreserveHoodieCommitMetadata(true).build();
+ .build();
HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
.withClusteringConfig(clusteringConfig)
@@ -1633,26 +1622,26 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
@ParameterizedTest
- @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
- public void testClusteringWithSortColumns(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+ @MethodSource("populateMetaFieldsParams")
+ public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+ .build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
- @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
- public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+ @MethodSource("populateMetaFieldsParams")
+ public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("begin_lat,begin_lon")
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+ .build();
// note that assertSameFileIds is true for this test because of the plan and execution strategy
testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@@ -1923,7 +1912,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
SparkRDDWriteClient client = getHoodieWriteClient(config);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
- if (config.isPreserveHoodieCommitMetadataForClustering() && config.populateMetaFields()) {
+ if (config.populateMetaFields()) {
verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect());
} else {
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index d6d5b2495a3..7745998fd97 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -38,7 +39,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 26f2705a4ab..184c7880c9e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -217,13 +217,12 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
}
// TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws Exception {
+ @Test
+ public void testLogFileCountsAfterCompaction() throws Exception {
boolean populateMetaFields = true;
// insert 100 records
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM,
- 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta);
+ 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();
@@ -304,7 +303,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
List<Row> rows = actual.collectAsList();
assertEquals(updatedRecords.size(), rows.size());
for (Row row : rows) {
- assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), preserveCommitMeta ? newCommitTime : compactionInstantTime);
+ assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), newCommitTime);
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
index 275fd32ca7d..ddf458f9505 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
@@ -170,13 +170,13 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
// verify new write shows up in snapshot mode after compaction is complete
snapshotROFiles = getROSnapshotFiles(partitionPath);
- validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime,
+ validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, updateTime,
insertsTime);
incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
assertTrue(incrementalROFiles.length == 2);
// verify 006 shows up because of pending compaction
- validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime,
+ validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, updateTime,
insertsTime);
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 73b1da95648..f061c152104 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -149,7 +149,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
HoodieWriteConfig cfg = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
- .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
@@ -191,7 +191,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
HoodieWriteConfig cfg = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
- .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 88d108879b4..511613d9044 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -360,20 +360,20 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
- return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig, false);
+ return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
- return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false);
+ return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
- long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig, boolean preserveCommitMetaForCompaction) {
+ long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
- .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build())
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 5d724ff9621..3de4f796e9b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -181,6 +181,7 @@ public class ClusteringUtils {
.setInputGroups(clusteringGroups)
.setExtraMetadata(extraMetadata)
.setStrategy(strategy)
+ .setPreserveHoodieMetadata(true)
.build();
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 2d9243ab5b4..54ee03451cd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -31,8 +31,6 @@ import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
@@ -303,10 +301,7 @@ public class FlinkOptions extends HoodieConfig {
.booleanType()
.defaultValue(false)// default read as batch
.withDescription("Whether to skip compaction instants and avoid reading compacted base files for streaming read to improve read performance.\n"
- + "There are two cases that this option can be used to avoid reading duplicates:\n"
- + "1) you are definitely sure that the consumer reads [faster than/completes before] any compaction instants "
- + "when " + HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n"
- + "2) changelog mode is enabled, this option is a solution to keep data integrity");
+ + "This option can be used to avoid reading duplicates when changelog mode is enabled, it is a solution to keep data integrity\n");
// this option is experimental
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING = ConfigOptions
@@ -314,10 +309,7 @@ public class FlinkOptions extends HoodieConfig {
.booleanType()
.defaultValue(false)
.withDescription("Whether to skip clustering instants to avoid reading base files of clustering operations for streaming read "
- + "to improve read performance.\n"
- + "This option toggled to true to avoid duplicates when: \n"
- + "1) you are definitely sure that the consumer reads [faster than/completes before] any clustering instants "
- + "when " + HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n");
+ + "to improve read performance.");
public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index bf05e54117e..85a5626e3b8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -18,23 +18,21 @@
package org.apache.hudi.sink.clustering;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
@@ -54,7 +52,6 @@ import org.apache.hudi.util.FlinkWriteClients;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -63,8 +60,8 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
@@ -92,8 +89,6 @@ import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
-
/**
* Operator to execute the actual clustering task assigned by the clustering plan task.
* In order to execute scalable, the input should shuffle by the clustering event {@link ClusteringPlanEvent}.
@@ -103,7 +98,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
private final Configuration conf;
- private final boolean preserveHoodieMetadata;
private final RowType rowType;
private int taskID;
private transient HoodieWriteConfig writeConfig;
@@ -133,10 +127,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
public ClusteringOperator(Configuration conf, RowType rowType) {
this.conf = conf;
- this.preserveHoodieMetadata = conf.getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.defaultValue());
- this.rowType = this.preserveHoodieMetadata
- ? BulkInsertWriterHelper.addMetadataFields(rowType, false)
- : rowType;
+ this.rowType = BulkInsertWriterHelper.addMetadataFields(rowType, false);
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
@@ -161,7 +152,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
this.table = writeClient.getHoodieTable();
this.schema = AvroSchemaConverter.convertToSchema(rowType);
- this.readerSchema = this.preserveHoodieMetadata ? this.schema : HoodieAvroUtils.addMetadataFields(this.schema);
+ this.readerSchema = this.schema;
this.requiredPos = getRequiredPositions();
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
@@ -228,7 +219,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
- this.rowType, this.preserveHoodieMetadata);
+ this.rowType, true);
List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
@@ -348,9 +339,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
* Transform IndexedRecord into HoodieRecord.
*/
private RowData transform(IndexedRecord indexedRecord) {
- GenericRecord record = this.preserveHoodieMetadata
- ? (GenericRecord) indexedRecord
- : buildAvroRecordBySchema(indexedRecord, schema, requiredPos, new GenericRecordBuilder(schema));
+ GenericRecord record = (GenericRecord) indexedRecord;
return (RowData) avroToRowDataConverter.convert(record);
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index ce5b7e739d2..8e5897d14e1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -302,7 +302,7 @@ public class HoodieJavaStreamingApp {
// wait for spark streaming to process one microbatch
Thread.sleep(3000);
waitTillNCommits(fs, numExpCommits, 180, 3);
- commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+ commitInstantTime2 = HoodieDataSourceHelpers.listCommitsSince(fs, tablePath, commitInstantTime1).stream().sorted().findFirst().get();
LOG.info("Second commit at instant time :" + commitInstantTime2);
}
@@ -312,8 +312,7 @@ public class HoodieJavaStreamingApp {
}
// Wait for compaction to also finish and track latest timestamp as commit timestamp
waitTillNCommits(fs, numExpCommits, 180, 3);
- commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
- LOG.info("Compaction commit at instant time :" + commitInstantTime2);
+ LOG.info("Compaction commit at instant time :" + HoodieDataSourceHelpers.latestCommit(fs, tablePath));
}
/**
@@ -377,7 +376,6 @@ public class HoodieJavaStreamingApp {
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
- .option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false")
.option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH().key(),"true")
.option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index be93171adc2..63e2f7f58c1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -58,8 +58,8 @@ import org.apache.hudi.io.storage.HoodieAvroParquetReader;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.table.action.bootstrap.BootstrapUtils;
-import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -305,7 +305,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase {
client.compact(compactionInstant.get());
checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
- Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
+ Arrays.asList(compactionInstant.get()), false);
}
client.close();
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
index 3a84755248e..c6b0560b87e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -61,28 +61,20 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
private static Stream<Arguments> testClustering() {
// enableClusteringAsRow, doUpdates, populateMetaFields, preserveCommitMetadata
return Stream.of(
- Arguments.of(true, true, true, true),
- Arguments.of(true, true, true, false),
- Arguments.of(true, true, false, true),
- Arguments.of(true, true, false, false),
- Arguments.of(true, false, true, true),
- Arguments.of(true, false, true, false),
- Arguments.of(true, false, false, true),
- Arguments.of(true, false, false, false),
- Arguments.of(false, true, true, true),
- Arguments.of(false, true, true, false),
- Arguments.of(false, true, false, true),
- Arguments.of(false, true, false, false),
- Arguments.of(false, false, true, true),
- Arguments.of(false, false, true, false),
- Arguments.of(false, false, false, true),
- Arguments.of(false, false, false, false)
+ Arguments.of(true, true, true),
+ Arguments.of(true, true, false),
+ Arguments.of(true, false, true),
+ Arguments.of(true, false, false),
+ Arguments.of(false, true, true),
+ Arguments.of(false, true, false),
+ Arguments.of(false, false, true),
+ Arguments.of(false, false, false)
);
}
@ParameterizedTest
@MethodSource
- void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+ void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean populateMetaFields) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
@@ -106,7 +98,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
- .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
+ .build())
.withRollbackUsingMarkers(false);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index 77a980f01e5..d442069d50f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -297,7 +297,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase {
client.compact(compactionInstant.get());
checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
- Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
+ Arrays.asList(compactionInstant.get()), false);
}
client.close();
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 879c220faa2..fe2e1702993 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -974,8 +974,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
@ParameterizedTest
- @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"})
- public void testInlineClustering(String preserveCommitMetadata, HoodieRecordType recordType) throws Exception {
+ @CsvSource(value = {"AVRO", "SPARK"})
+ public void testInlineClustering(HoodieRecordType recordType) throws Exception {
String tableBasePath = basePath + "/inlineClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
@@ -985,7 +985,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
TestHelpers.addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "", preserveCommitMetadata));
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
@@ -1122,13 +1122,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
- private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit,
- String asyncCluster, String asyncClusterMaxCommit, String preserveCommitMetadata) {
- List<String> configs = getAsyncServicesConfigs(totalRecords, autoClean, inlineCluster, inlineClusterMaxCommit, asyncCluster, asyncClusterMaxCommit);
- configs.add(String.format("%s=%s", HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), preserveCommitMetadata));
- return configs;
- }
-
private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster,
String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
List<String> configs = new ArrayList<>();