You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/29 00:37:32 UTC
[hudi] branch master updated: [HUDI-5628] Fixing log record reader scan V2 config name (#7764)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3979848a499 [HUDI-5628] Fixing log record reader scan V2 config name (#7764)
3979848a499 is described below
commit 3979848a499131db594bbb49eb9ab160531a729d
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sat Jan 28 16:37:22 2023 -0800
[HUDI-5628] Fixing log record reader scan V2 config name (#7764)
We introduced a new way to scan log blocks in LogRecordReader and have named it as "hoodie.log.record.reader.use.scanV2". Fixing the config name to be elegant: "hoodie.optimized.log.blocks.scan.enable". Fixing the corresponding Metadata config as well.
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 2 +-
.../cli/commands/TestHoodieLogFileCommand.java | 2 +-
.../apache/hudi/config/HoodieCompactionConfig.java | 11 ++--
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../action/compact/CompactionExecutionHelper.java | 4 +-
.../hudi/table/action/compact/HoodieCompactor.java | 2 +-
.../compact/LogCompactionExecutionHelper.java | 2 +-
.../HoodieLogCompactionPlanGenerator.java | 2 +-
.../MultipleSparkJobExecutionStrategy.java | 2 +-
.../TestHoodieClientOnMergeOnReadStorage.java | 6 +-
.../hudi/common/config/HoodieMetadataConfig.java | 17 ++---
.../table/log/AbstractHoodieLogRecordReader.java | 10 +--
.../table/log/HoodieMergedLogRecordScanner.java | 12 ++--
.../table/log/HoodieUnMergedLogRecordScanner.java | 12 ++--
.../hudi/metadata/HoodieBackedTableMetadata.java | 2 +-
.../metadata/HoodieMetadataLogRecordReader.java | 4 +-
.../common/functional/TestHoodieLogFormat.java | 74 +++++++++++-----------
.../realtime/RealtimeCompactedRecordReader.java | 2 +-
.../reader/DFSHoodieDatasetInputReader.java | 2 +-
20 files changed, 88 insertions(+), 86 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 9a0e485fc9f..075b809e05c 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -222,7 +222,7 @@ public class HoodieLogFileCommand {
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
- .withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
+ .withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
for (HoodieRecord hoodieRecord : scanner) {
Option<HoodieAvroIndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index aff12422f6a..261002c9327 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -233,7 +233,7 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness {
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
- .withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
+ .withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
Iterator<HoodieRecord> records = scanner.iterator();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index e22bf1e43d1..e37ff3c46bf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -188,11 +189,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Log compaction can be scheduled if the no. of log blocks crosses this threshold value. "
+ "This is effective only when log compaction is enabled via " + INLINE_LOG_COMPACT.key());
- public static final ConfigProperty<String> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty
- .key("hoodie.log.record.reader.use.scanV2")
+ public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN = ConfigProperty
+ .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
.defaultValue("false")
.sinceVersion("0.13.0")
- .withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. "
+ .withDocumentation("New optimized scan for log blocks that handles all multi-writer use-cases while appending to log files. "
+ "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction.");
/** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
@@ -432,8 +433,8 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withLogRecordReaderScanV2(String useLogRecordReaderScanV2) {
- compactionConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, useLogRecordReaderScanV2);
+ public Builder withEnableOptimizedLogBlocksScan(String enableOptimizedLogBlocksScan) {
+ compactionConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, enableOptimizedLogBlocksScan);
return this;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 228cc60c249..8890ddfdeee 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1306,8 +1306,8 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
}
- public boolean useScanV2ForLogRecordReader() {
- return getBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2);
+ public boolean enableOptimizedLogBlocksScan() {
+ return getBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
}
public HoodieCleaningPolicy getCleanerPolicy() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index bbfa4460af4..a8356ff9c71 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -283,7 +283,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
// by default, the HFile does not keep the metadata fields, set up as false
// to always use the metadata of the new record.
.withPreserveCommitMetadata(false)
- .withLogRecordReaderScanV2(String.valueOf(writeConfig.useScanV2ForLogRecordReader()))
+ .withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
.build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java
index f402a673598..bdb83616196 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java
@@ -71,8 +71,8 @@ public class CompactionExecutionHelper<T extends HoodieRecordPayload, I, K, O> i
return result;
}
- protected boolean useScanV2(HoodieWriteConfig writeConfig) {
- return writeConfig.useScanV2ForLogRecordReader();
+ protected boolean enableOptimizedLogBlockScan(HoodieWriteConfig writeConfig) {
+ return writeConfig.enableOptimizedLogBlocksScan();
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index c6a20436c03..0d18a68cbad 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -199,7 +199,7 @@ public abstract class HoodieCompactor<T, I, K, O> implements Serializable {
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
- .withUseScanV2(executionHelper.useScanV2(config))
+ .withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
.withRecordMerger(config.getRecordMerger())
.build();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java
index 0e49267507c..8d2b054d09f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java
@@ -80,7 +80,7 @@ public class LogCompactionExecutionHelper<T extends HoodieRecordPayload, I, K, O
}
@Override
- protected boolean useScanV2(HoodieWriteConfig writeConfig) {
+ protected boolean enableOptimizedLogBlockScan(HoodieWriteConfig writeConfig) {
return true;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 6a5f160f6b0..e7a77002cc5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -90,7 +90,7 @@ public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I,
.collect(Collectors.toList()))
.withLatestInstantTime(maxInstantTime)
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
- .withUseScanV2(true)
+ .withOptimizedLogBlocksScan(true)
.withRecordMerger(writeConfig.getRecordMerger())
.build();
scanner.scan(true);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 3c5c4152112..6e981d2823f 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -293,7 +293,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
- .withUseScanV2(config.useScanV2ForLogRecordReader())
+ .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 6e2257dcd91..be6a71426b6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -200,7 +200,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception {
HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder()
.withLogCompactionBlocksThreshold("1")
- .withLogRecordReaderScanV2("true")
+ .withEnableOptimizedLogBlocksScan("true")
.build();
HoodieWriteConfig config = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA,
HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build();
@@ -447,7 +447,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
.collect(Collectors.toList()))
.withLatestInstantTime(instant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
- .withUseScanV2(true)
+ .withOptimizedLogBlocksScan(true)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
scanner.scan(true);
@@ -461,7 +461,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
.collect(Collectors.toList()))
.withLatestInstantTime(currentInstant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
- .withUseScanV2(true)
+ .withOptimizedLogBlocksScan(true)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
scanner2.scan(true);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 81f2c1daeff..9b84466090d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -42,6 +42,7 @@ import java.util.Properties;
public final class HoodieMetadataConfig extends HoodieConfig {
public static final String METADATA_PREFIX = "hoodie.metadata";
+ public static final String OPTIMIZED_LOG_BLOCKS_SCAN = ".optimized.log.blocks.scan.enable";
// Enable the internal Metadata Table which saves file listings
public static final ConfigProperty<Boolean> ENABLE = ConfigProperty
@@ -237,12 +238,12 @@ public final class HoodieMetadataConfig extends HoodieConfig {
+ "metadata table which are never added before. This config determines how to handle "
+ "such spurious deletes");
- public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty
- .key(METADATA_PREFIX + ".log.record.reader.use.scanV2")
+ public static final ConfigProperty<Boolean> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN = ConfigProperty
+ .key(METADATA_PREFIX + OPTIMIZED_LOG_BLOCKS_SCAN)
.defaultValue(false)
.sinceVersion("0.13.0")
- .withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. "
- + "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction.");
+ .withDocumentation("Optimized log blocks scanner that addresses all the multiwriter use-cases while appending to log files. "
+ + "It also differentiates original blocks written by ingestion writers and compacted blocks written by log compaction.");
private HoodieMetadataConfig() {
super();
@@ -328,8 +329,8 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getBoolean(IGNORE_SPURIOUS_DELETES);
}
- public boolean getUseLogRecordReaderScanV2() {
- return getBoolean(USE_LOG_RECORD_READER_SCAN_V2);
+ public boolean doEnableOptimizedLogBlocksScan() {
+ return getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
}
/**
@@ -478,8 +479,8 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return this;
}
- public Builder withLogRecordReaderScanV2(boolean useLogRecordReaderScanV2) {
- metadataConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, String.valueOf(useLogRecordReaderScanV2));
+ public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
+ metadataConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, String.valueOf(enableOptimizedLogBlocksScan));
return this;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 83172ecb7ae..42babc775b9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -149,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader {
// Collect all the block instants after scanning all the log files.
private final List<String> validBlockInstants = new ArrayList<>();
// Use scanV2 method.
- private final boolean useScanV2;
+ private final boolean enableOptimizedLogBlocksScan;
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
@@ -158,7 +158,7 @@ public abstract class AbstractHoodieLogRecordReader {
Option<String> partitionNameOverride,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
- boolean useScanV2,
+ boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
@@ -184,7 +184,7 @@ public abstract class AbstractHoodieLogRecordReader {
this.withOperationField = withOperationField;
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
- this.useScanV2 = useScanV2;
+ this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
if (keyFieldOverride.isPresent()) {
// NOTE: This branch specifically is leveraged handling Metadata Table
@@ -217,7 +217,7 @@ public abstract class AbstractHoodieLogRecordReader {
*/
protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
synchronized (this) {
- if (useScanV2) {
+ if (enableOptimizedLogBlocksScan) {
scanInternalV2(keySpecOpt, skipProcessingBlocks);
} else {
scanInternalV1(keySpecOpt);
@@ -894,7 +894,7 @@ public abstract class AbstractHoodieLogRecordReader {
throw new UnsupportedOperationException();
}
- public Builder withUseScanV2(boolean useScanV2) {
+ public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
throw new UnsupportedOperationException();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index c41d78b2808..e5ce343eb39 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -98,9 +98,9 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
- boolean useScanV2, HoodieRecordMerger recordMerger) {
+ boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
- instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, useScanV2, recordMerger);
+ instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
try {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
@@ -333,7 +333,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
// By default, we're doing a full-scan
private boolean forceFullScan = true;
// Use scanV2 method.
- private boolean useScanV2 = false;
+ private boolean enableOptimizedLogBlocksScan = false;
private HoodieRecordMerger recordMerger;
@Override
@@ -430,8 +430,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
}
@Override
- public Builder withUseScanV2(boolean useScanV2) {
- this.useScanV2 = useScanV2;
+ public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
+ this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
return this;
}
@@ -462,7 +462,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
- Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), useScanV2, recordMerger);
+ Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 2a7c91641e1..726172e5ee0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -42,9 +42,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize,
LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema,
- boolean useScanV2, HoodieRecordMerger recordMerger) {
+ boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange,
- false, true, Option.empty(), internalSchema, Option.empty(), useScanV2, recordMerger);
+ false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger);
this.callback = callback;
}
@@ -105,7 +105,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
private Option<InstantRange> instantRange = Option.empty();
// specific configurations
private LogRecordScannerCallback callback;
- private boolean useScanV2;
+ private boolean enableOptimizedLogBlocksScan;
private HoodieRecordMerger recordMerger;
public Builder withFileSystem(FileSystem fs) {
@@ -167,8 +167,8 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
}
@Override
- public Builder withUseScanV2(boolean useScanV2) {
- this.useScanV2 = useScanV2;
+ public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
+ this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
return this;
}
@@ -184,7 +184,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange,
- internalSchema, useScanV2, recordMerger);
+ internalSchema, enableOptimizedLogBlocksScan, recordMerger);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index ecb0da8792d..0ab11d65e8b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -531,7 +531,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
.withLogBlockTimestamps(validInstantTimestamps)
.enableFullScan(allowFullScan)
.withPartition(partitionName)
- .withUseScanV2(metadataConfig.getUseLogRecordReaderScanV2())
+ .withEnableOptimizedLogBlocksScan(metadataConfig.doEnableOptimizedLogBlocksScan())
.build();
Long logScannerOpenMs = timer.endTimer();
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 48b9d66f89b..fe92758945f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -209,8 +209,8 @@ public class HoodieMetadataLogRecordReader implements Closeable {
return this;
}
- public Builder withUseScanV2(boolean useScanV2) {
- scannerBuilder.withUseScanV2(useScanV2);
+ public Builder withEnableOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
+ scannerBuilder.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan);
return this;
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 9250429b377..af7216d3ce2 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -625,7 +625,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Generate 4 delta-log files w/ random records
@@ -652,7 +652,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
@@ -672,7 +672,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanV2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Generate 3 delta-log files w/ random records
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -699,7 +699,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanV2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withForceFullScan(false)
.build();
@@ -763,7 +763,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanV2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Generate 3 delta-log files w/ random records
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -790,7 +790,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanV2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withForceFullScan(false)
.build();
@@ -1049,7 +1049,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1097,7 +1097,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(200, scanner.getTotalLogRecords());
@@ -1117,7 +1117,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1184,7 +1184,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches");
@@ -1203,7 +1203,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
@MethodSource("testArguments")
public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1276,7 +1276,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records");
@@ -1296,7 +1296,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1359,7 +1359,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
@@ -1560,7 +1560,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in same batch)
@@ -1633,7 +1633,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback");
@@ -1651,7 +1651,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in same batch)
@@ -1707,7 +1707,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
@@ -1721,7 +1721,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1764,7 +1764,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records");
@@ -1780,7 +1780,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -1840,7 +1840,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
@@ -1853,7 +1853,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2)
+ boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -1952,7 +1952,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
@@ -1961,7 +1961,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
}
@ParameterizedTest
- @MethodSource("testArgumentsWithoutScanV2Arg")
+ @MethodSource("testArgumentsWithoutOptimizedScanArg")
public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
@@ -2134,7 +2134,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(true)
+ .withOptimizedLogBlocksScan(true)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(600, scanner.getTotalLogRecords(), "We would read 600 records from scanner");
@@ -2170,7 +2170,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2) {
+ boolean enableOptimizedLogBlocksScan) {
try {
// Write one Data block with same InstantTime (written in same batch)
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -2224,7 +2224,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withUseScanV2(useScanv2)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
@@ -2241,13 +2241,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2) {
+ boolean enableOptimizedLogBlocksScan) {
/*
* FIRST_ATTEMPT_FAILED:
* Original task from the stage attempt failed, but subsequent stage retry succeeded.
*/
testAvroLogRecordReaderMergingMultipleLogFiles(77, 100,
- diskMapType, isCompressionEnabled, readBlocksLazily, useScanv2);
+ diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan);
}
@ParameterizedTest
@@ -2255,13 +2255,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2) {
+ boolean enableOptimizedLogBlocksScan) {
/*
* SECOND_ATTEMPT_FAILED:
* Original task from stage attempt succeeded, but subsequent retry attempt failed.
*/
testAvroLogRecordReaderMergingMultipleLogFiles(100, 66,
- diskMapType, isCompressionEnabled, readBlocksLazily, useScanv2);
+ diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan);
}
@ParameterizedTest
@@ -2269,13 +2269,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily,
- boolean useScanv2) {
+ boolean enableOptimizedLogBlocksScan) {
/*
* BOTH_ATTEMPTS_SUCCEEDED:
* Original task from the stage attempt and duplicate task from the stage retry succeeded.
*/
testAvroLogRecordReaderMergingMultipleLogFiles(100, 100,
- diskMapType, isCompressionEnabled, readBlocksLazily, useScanv2);
+ diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan);
}
@ParameterizedTest
@@ -2593,7 +2593,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
}
private static Stream<Arguments> testArguments() {
- // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: useScanv2
+ // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: enableOptimizedLogBlocksScan
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false, true),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false, true),
@@ -2614,8 +2614,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
);
}
- private static Stream<Arguments> testArgumentsWithoutScanV2Arg() {
- // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: useScanv2
+ private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() {
+ // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: enableOptimizedLogBlocksScan
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false),
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 56923063478..cf16bf0bd80 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -100,7 +100,7 @@ public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
- .withUseScanV2(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2, false))
+ .withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2, false))
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 3ed86c32c94..0e3f27e2e3b 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -290,7 +290,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
- .withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
+ .withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
// readAvro log files