You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/01/19 17:10:23 UTC
[hudi] branch master updated: [HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup (#6782)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f8028a400eb [HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup (#6782)
f8028a400eb is described below
commit f8028a400ebe7a64d7745fae90e769306a8a07a7
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Jan 19 09:10:13 2023 -0800
[HUDI-4911][HUDI-3301] Fixing `HoodieMetadataLogRecordReader` to avoid flushing cache for every lookup (#6782)
Currently, HoodieMetadataLogRecordReader is flushing cache for every lookup it's doing, which led to multiple occasions of poor performance as MT had to be re-scanned over and over again, even though nothing really changed.
This PR rectifies that, additionally:
Making sure LogRecordScanner is not extended for MT (there's no reason for that, instead its configuration is simply expanded to accommodate for MT use-case)
Refining and cleaning up the API provided HoodieMetadataLogRecordReader
Avoiding unnecessary locking
---
.../cli/commands/TestHoodieLogFileCommand.java | 3 +-
.../hudi/cli/integ/ITTestRepairsCommand.java | 5 +-
.../common/table/log/HoodieFileSliceReader.java | 2 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 6 +-
.../apache/hudi/io/HoodieSortedMergeHandle.java | 3 +-
.../HoodieLogCompactionPlanGenerator.java | 3 +-
.../functional/TestHoodieBackedMetadata.java | 15 +-
.../functional/TestHoodieBackedTableMetadata.java | 15 +-
.../TestHoodieClientOnMergeOnReadStorage.java | 4 +-
.../hudi/testutils/HoodieClientTestHarness.java | 2 +-
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 3 +-
.../table/log/AbstractHoodieLogRecordReader.java | 204 +++++++------
.../table/log/HoodieMergedLogRecordScanner.java | 206 ++++++++++----
.../table/log/HoodieUnMergedLogRecordScanner.java | 13 +-
.../apache/hudi/common/util/CollectionUtils.java | 4 +-
.../util/collection/ExternalSpillableMap.java | 2 +
.../hudi/metadata/HoodieBackedTableMetadata.java | 75 +++--
.../metadata/HoodieMetadataLogRecordReader.java | 238 ++++++++++++++++
.../HoodieMetadataMergedLogRecordReader.java | 254 -----------------
.../common/functional/TestHoodieLogFormat.java | 315 ++++++++++++++++++---
.../apache/hudi/common/model/TestHoodieRecord.java | 3 +-
.../hudi/common/testutils/SchemaTestUtil.java | 25 +-
.../common/util/collection/TestBitCaskDiskMap.java | 26 +-
.../util/collection/TestExternalSpillableMap.java | 27 +-
.../util/collection/TestRocksDbBasedMap.java | 3 +-
.../common/util/collection/TestRocksDbDiskMap.java | 14 +-
.../src/main/scala/org/apache/hudi/Iterators.scala | 55 ++--
.../sql/hudi/procedure/TestRepairsProcedure.scala | 5 +-
28 files changed, 950 insertions(+), 580 deletions(-)
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 21e6218dbe2..aff12422f6a 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -194,7 +194,8 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness {
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();
- List<HoodieRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<HoodieRecord> records1 = testUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
index 69db47136e9..a95ed9ff778 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java
@@ -86,8 +86,9 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase {
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
// generate 200 records
- HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
- HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ HoodieRecord[] hoodieRecords1 = testUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
+ HoodieRecord[] hoodieRecords2 = testUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);
// generate duplicates
HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
index 35ca3d6d5ad..eb4e18366d7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
@@ -43,7 +43,7 @@ public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {
Iterator<HoodieRecord> baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props,
- simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false));
+ simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionNameOverride(), false));
}
}
return new HoodieFileSliceReader(scanner.iterator());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index b6364aede14..0460f88101c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -410,10 +410,10 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
if (keyToNewRecords instanceof ExternalSpillableMap) {
((ExternalSpillableMap) keyToNewRecords).close();
- } else {
- keyToNewRecords.clear();
}
- writtenRecordKeys.clear();
+
+ keyToNewRecords = null;
+ writtenRecordKeys = null;
if (fileWriter != null) {
fileWriter.close();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 18fe6a344db..3d3a7308bb3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -117,13 +117,14 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I,
writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchema, config.getProps());
}
insertRecordsWritten++;
+ writtenRecordKeys.add(hoodieRecord.getRecordKey());
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
}
}
+
newRecordKeysSorted.clear();
- keyToNewRecords.clear();
return super.close();
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index e2ed1a06ac9..6a5f160f6b0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.LogCompactionExecutionHelper;
@@ -94,7 +93,7 @@ public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I,
.withUseScanV2(true)
.withRecordMerger(writeConfig.getRecordMerger())
.build();
- scanner.scanInternal(Option.empty(), true);
+ scanner.scan(true);
int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
LOG.info("Total blocks seen are " + totalBlocks);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index de27d68e3f5..84e6d342883 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -87,7 +87,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
-import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -1099,7 +1099,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
if (enableMetaFields) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
- HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder()
+ HoodieMetadataLogRecordReader logRecordReader = HoodieMetadataLogRecordReader.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
@@ -1112,14 +1112,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
.withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
.build();
- assertDoesNotThrow(() -> {
- logRecordReader.scan();
- }, "Metadata log records materialization failed");
-
- for (Map.Entry<String, HoodieRecord> entry : logRecordReader.getRecords().entrySet()) {
- assertFalse(entry.getKey().isEmpty());
- assertFalse(entry.getValue().getRecordKey().isEmpty());
- assertEquals(entry.getKey(), entry.getValue().getRecordKey());
+ for (HoodieRecord<? extends HoodieRecordPayload> entry : logRecordReader.getRecords()) {
+ assertFalse(entry.getRecordKey().isEmpty());
+ assertEquals(entry.getKey().getRecordKey(), entry.getRecordKey());
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 3675f7e8f71..b1348509dfe 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -39,7 +39,7 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
-import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
+import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -379,7 +379,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
*/
private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClient, List<String> logFilePaths, String latestCommitTimestamp) {
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieMetadataMergedLogRecordReader logRecordReader = HoodieMetadataMergedLogRecordReader.newBuilder()
+ HoodieMetadataLogRecordReader logRecordReader = HoodieMetadataLogRecordReader.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
@@ -392,14 +392,9 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
.withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
.build();
- assertDoesNotThrow(() -> {
- logRecordReader.scan();
- }, "Metadata log records materialization failed");
-
- for (Map.Entry<String, HoodieRecord> entry : logRecordReader.getRecords().entrySet()) {
- assertFalse(entry.getKey().isEmpty());
- assertFalse(entry.getValue().getRecordKey().isEmpty());
- assertEquals(entry.getKey(), entry.getValue().getRecordKey());
+ for (HoodieRecord<?> entry : logRecordReader.getRecords()) {
+ assertFalse(entry.getRecordKey().isEmpty());
+ assertEquals(entry.getKey().getRecordKey(), entry.getRecordKey());
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 32a7fbd7100..c623e13bddb 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -447,7 +447,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
.withUseScanV2(true)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
- scanner.scanInternal(Option.empty(), true);
+ scanner.scan(true);
List<String> prevInstants = scanner.getValidBlockInstants();
HoodieUnMergedLogRecordScanner scanner2 = HoodieUnMergedLogRecordScanner.newBuilder()
.withFileSystem(metaClient.getFs())
@@ -461,7 +461,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase {
.withUseScanV2(true)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
- scanner2.scanInternal(Option.empty(), true);
+ scanner2.scan(true);
List<String> currentInstants = scanner2.getValidBlockInstants();
assertEquals(prevInstants, currentInstants);
});
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 9d305777adf..7cebf894a2a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -228,11 +228,11 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness {
LOG.info("Clearing sql context cache of spark-session used in previous test-case");
sqlContext.clearCache();
sqlContext = null;
+ sparkSession = null;
}
if (jsc != null) {
LOG.info("Closing spark context used in previous test-case");
- jsc.close();
jsc.stop();
jsc = null;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index cef86109ff8..c00bf7ea997 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -51,6 +51,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -342,7 +343,7 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
.collect(Collectors.toMap(Pair::getKey, p -> p.getRight().get()));
Set<Path> missingPartitionPaths =
- CollectionUtils.diff(partitionPaths, cachedPartitionPaths.keySet());
+ CollectionUtils.diffSet(new HashSet<>(partitionPaths), cachedPartitionPaths.keySet());
// NOTE: We're constructing a mapping of absolute form of the partition-path into
// its relative one, such that we don't need to reconstruct these again later on
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index ca53802a3af..83172ecb7ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -28,19 +28,15 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
-import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.ClosableIteratorWithSchema;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -77,6 +73,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetada
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.COMMAND_BLOCK;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Implements logic to scan log blocks and expose valid and deleted log records to subclass implementation. Subclass is
@@ -102,13 +99,16 @@ public abstract class AbstractHoodieLogRecordReader {
protected final HoodieTableMetaClient hoodieTableMetaClient;
// Merge strategy to use when combining records from log
private final String payloadClassFQN;
- // preCombine field
+ // Record's key/partition-path fields
+ private final String recordKeyField;
+ private final Option<String> partitionPathFieldOpt;
+ // Partition name override
+ private final Option<String> partitionNameOverrideOpt;
+ // Pre-combining field
protected final String preCombineField;
// Stateless component for merging records
protected final HoodieRecordMerger recordMerger;
private final TypedProperties payloadProps;
- // simple key gen fields
- private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
// Log File Paths
protected final List<String> logFilePaths;
// Read Lazily flag
@@ -127,9 +127,7 @@ public abstract class AbstractHoodieLogRecordReader {
// Total log files read - for metrics
private AtomicLong totalLogFiles = new AtomicLong(0);
// Internal schema, used to support full schema evolution.
- private InternalSchema internalSchema;
- // Hoodie table path.
- private final String path;
+ private final InternalSchema internalSchema;
// Total log blocks read - for metrics
private AtomicLong totalLogBlocks = new AtomicLong(0);
// Total log records read - for metrics
@@ -142,35 +140,26 @@ public abstract class AbstractHoodieLogRecordReader {
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
// Enables full scan of log records
protected final boolean forceFullScan;
- private int totalScannedLogFiles;
// Progress
private float progress = 0.0f;
- // Partition name
- private Option<String> partitionName;
// Populate meta fields for the records
- private boolean populateMetaFields = true;
+ private final boolean populateMetaFields;
// Record type read from log block
protected final HoodieRecordType recordType;
// Collect all the block instants after scanning all the log files.
- private List<String> validBlockInstants = new ArrayList<>();
+ private final List<String> validBlockInstants = new ArrayList<>();
// Use scanV2 method.
- private boolean useScanV2;
-
- protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
- Schema readerSchema,
- String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
- int bufferSize, Option<InstantRange> instantRange,
- boolean withOperationField, HoodieRecordMerger recordMerger) {
- this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
- instantRange, withOperationField, true, Option.empty(), InternalSchema.getEmptyInternalSchema(), false, recordMerger);
- }
+ private final boolean useScanV2;
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean forceFullScan,
- Option<String> partitionName, InternalSchema internalSchema,
- boolean useScanV2, HoodieRecordMerger recordMerger) {
+ Option<String> partitionNameOverride,
+ InternalSchema internalSchema,
+ Option<String> keyFieldOverride,
+ boolean useScanV2,
+ HoodieRecordMerger recordMerger) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
@@ -195,44 +184,48 @@ public abstract class AbstractHoodieLogRecordReader {
this.withOperationField = withOperationField;
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
- this.path = basePath;
this.useScanV2 = useScanV2;
- // Key fields when populate meta fields is disabled (that is, virtual keys enabled)
- if (!tableConfig.populateMetaFields()) {
- this.populateMetaFields = false;
- this.simpleKeyGenFields = Option.of(
- Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
- }
- this.partitionName = partitionName;
- this.recordType = recordMerger.getRecordType();
- }
+ if (keyFieldOverride.isPresent()) {
+ // NOTE: This branch specifically is leveraged handling Metadata Table
+ // log-block merging sequence. Here we do
+ // - Override the record-key field (which isn't configured t/h table-config)
+ // - Override partition-path value w/ static "partition-name" (in MT all partitions
+ // are static, like "files", "col_stats", etc)
+ checkState(partitionNameOverride.isPresent());
- protected String getKeyField() {
- if (this.populateMetaFields) {
- return HoodieRecord.RECORD_KEY_METADATA_FIELD;
+ this.populateMetaFields = false;
+ this.recordKeyField = keyFieldOverride.get();
+ this.partitionPathFieldOpt = Option.empty();
+ } else if (tableConfig.populateMetaFields()) {
+ this.populateMetaFields = true;
+ this.recordKeyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
+ this.partitionPathFieldOpt = Option.of(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+ } else {
+ this.populateMetaFields = false;
+ this.recordKeyField = tableConfig.getRecordKeyFieldProp();
+ this.partitionPathFieldOpt = Option.of(tableConfig.getPartitionFieldProp());
}
- ValidationUtils.checkState(this.simpleKeyGenFields.isPresent());
- return this.simpleKeyGenFields.get().getKey();
- }
-
- public synchronized void scan() {
- scanInternal(Option.empty(), false);
- }
- public synchronized void scan(List<String> keys) {
- scanInternal(Option.of(new KeySpec(keys, true)), false);
+ this.partitionNameOverrideOpt = partitionNameOverride;
+ this.recordType = recordMerger.getRecordType();
}
- public synchronized void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
- if (useScanV2) {
- scanInternalV2(keySpecOpt, skipProcessingBlocks);
- } else {
- scanInternal(keySpecOpt);
+ /**
+ * @param keySpecOpt specifies target set of keys to be scanned
+ * @param skipProcessingBlocks controls, whether (delta) blocks have to actually be processed
+ */
+ protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
+ synchronized (this) {
+ if (useScanV2) {
+ scanInternalV2(keySpecOpt, skipProcessingBlocks);
+ } else {
+ scanInternalV1(keySpecOpt);
+ }
}
}
- private synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
+ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
@@ -245,15 +238,10 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
try {
- // Get the key field based on populate meta fields config
- // and the table type
- final String keyField = getKeyField();
-
// Iterate over the paths
- boolean enableRecordLookups = !forceFullScan;
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
- readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
+ readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
@@ -398,7 +386,7 @@ public abstract class AbstractHoodieLogRecordReader {
}
}
- private synchronized void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) {
+ private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
@@ -411,16 +399,10 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
try {
-
- // Get the key field based on populate meta fields config
- // and the table type
- final String keyField = getKeyField();
-
- boolean enableRecordLookups = !forceFullScan;
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
- readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
+ readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);
/**
* Scanning log blocks and placing the compacted blocks at the right place require two traversals.
@@ -638,25 +620,28 @@ public abstract class AbstractHoodieLogRecordReader {
* handle it.
*/
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
+ checkState(partitionNameOverrideOpt.isPresent() || partitionPathFieldOpt.isPresent(),
+ "Either partition-name override or partition-path field had to be present");
+
+ Option<Pair<String, String>> recordKeyPartitionPathFieldPair = populateMetaFields
+ ? Option.empty()
+ : Option.of(Pair.of(recordKeyField, partitionPathFieldOpt.orElse(null)));
+
try (ClosableIteratorWithSchema<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
while (recordIterator.hasNext()) {
HoodieRecord completedRecord = recordIterator.next()
.wrapIntoHoodieRecordPayloadWithParams(recordIterator.getSchema(),
hoodieTableMetaClient.getTableConfig().getProps(),
- this.simpleKeyGenFields,
+ recordKeyPartitionPathFieldPair,
this.withOperationField,
- this.partitionName,
- getPopulateMetaFields());
+ this.partitionNameOverrideOpt,
+ populateMetaFields);
processNextRecord(completedRecord);
totalLogRecords.incrementAndGet();
}
}
}
- protected boolean getPopulateMetaFields() {
- return this.populateMetaFields;
- }
-
/**
* Process next record.
*
@@ -682,13 +667,9 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
- processDataBlock((HoodieAvroDataBlock) lastBlock, keySpecOpt);
- break;
case HFILE_DATA_BLOCK:
- processDataBlock((HoodieHFileDataBlock) lastBlock, keySpecOpt);
- break;
case PARQUET_DATA_BLOCK:
- processDataBlock((HoodieParquetDataBlock) lastBlock, keySpecOpt);
+ processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
@@ -704,6 +685,12 @@ public abstract class AbstractHoodieLogRecordReader {
progress = (numLogFilesSeen - 1) / logFilePaths.size();
}
+ private boolean shouldLookupRecords() {
+ // NOTE: Point-wise record lookups are only enabled when scanner is not in
+ // a full-scan mode
+ return !forceFullScan;
+ }
+
/**
* Return progress of scanning as a float between 0.0 to 1.0.
*/
@@ -727,8 +714,8 @@ public abstract class AbstractHoodieLogRecordReader {
return payloadClassFQN;
}
- public Option<String> getPartitionName() {
- return partitionName;
+ public Option<String> getPartitionNameOverride() {
+ return partitionNameOverrideOpt;
}
public long getTotalRollbacks() {
@@ -750,13 +737,52 @@ public abstract class AbstractHoodieLogRecordReader {
/**
* Key specification with a list of column names.
*/
- protected static class KeySpec {
- private final List<String> keys;
- private final boolean fullKey;
+ protected interface KeySpec {
+ List<String> getKeys();
+
+ boolean isFullKey();
- public KeySpec(List<String> keys, boolean fullKey) {
+ static KeySpec fullKeySpec(List<String> keys) {
+ return new FullKeySpec(keys);
+ }
+
+ static KeySpec prefixKeySpec(List<String> keyPrefixes) {
+ return new PrefixKeySpec(keyPrefixes);
+ }
+ }
+
+ private static class FullKeySpec implements KeySpec {
+ private final List<String> keys;
+ private FullKeySpec(List<String> keys) {
this.keys = keys;
- this.fullKey = fullKey;
+ }
+
+ @Override
+ public List<String> getKeys() {
+ return keys;
+ }
+
+ @Override
+ public boolean isFullKey() {
+ return true;
+ }
+ }
+
+ private static class PrefixKeySpec implements KeySpec {
+ private final List<String> keysPrefixes;
+
+ private PrefixKeySpec(List<String> keysPrefixes) {
+ this.keysPrefixes = keysPrefixes;
+ }
+
+ @Override
+ public List<String> getKeys() {
+ return keysPrefixes;
+ }
+
+ @Override
+ public boolean isFullKey() {
+ return false;
}
}
@@ -774,7 +800,7 @@ public abstract class AbstractHoodieLogRecordReader {
if (keySpecOpt.isPresent()) {
KeySpec keySpec = keySpecOpt.get();
blockRecordsIterator = (ClosableIterator) dataBlock
- .getRecordIterator(keySpec.keys, keySpec.fullKey, recordType);
+ .getRecordIterator(keySpec.getKeys(), keySpec.isFullKey(), recordType);
} else {
blockRecordsIterator = (ClosableIterator) dataBlock.getRecordIterator(recordType);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 3ff1432f640..c41d78b2808 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -44,14 +44,19 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
@@ -65,40 +70,43 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
* <p>
* This results in two I/O passes over the log file.
*/
-
+@NotThreadSafe
public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
- implements Iterable<HoodieRecord> {
+ implements Iterable<HoodieRecord>, Closeable {
private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = new HoodieTimer();
- // Final map of compacted/merged records
- protected final ExternalSpillableMap<String, HoodieRecord> records;
+ // Map of compacted/merged records
+ private final ExternalSpillableMap<String, HoodieRecord> records;
+ // Set of already scanned prefixes allowing us to avoid scanning same prefixes again
+ private final Set<String> scannedPrefixes;
// count of merged records in log
private long numMergedRecordsInLog;
- private long maxMemorySizeInBytes;
+ private final long maxMemorySizeInBytes;
// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
@SuppressWarnings("unchecked")
- protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
- String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
- boolean reverseReader, int bufferSize, String spillableMapBasePath,
- Option<InstantRange> instantRange,
- ExternalSpillableMap.DiskMapType diskMapType,
- boolean isBitCaskDiskMapCompressionEnabled,
- boolean withOperationField, boolean forceFullScan,
- Option<String> partitionName, InternalSchema internalSchema,
- boolean useScanV2, HoodieRecordMerger recordMerger) {
+ private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
+ String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
+ boolean reverseReader, int bufferSize, String spillableMapBasePath,
+ Option<InstantRange> instantRange,
+ ExternalSpillableMap.DiskMapType diskMapType,
+ boolean isBitCaskDiskMapCompressionEnabled,
+ boolean withOperationField, boolean forceFullScan,
+ Option<String> partitionName,
+ InternalSchema internalSchema,
+ Option<String> keyFieldOverride,
+ boolean useScanV2, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
- instantRange, withOperationField,
- forceFullScan, partitionName, internalSchema, useScanV2, recordMerger);
+ instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, useScanV2, recordMerger);
try {
+ this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
-
- this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+ this.scannedPrefixes = new HashSet<>();
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
@@ -108,30 +116,106 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
}
}
- protected void performScan() {
+ /**
+ * Scans delta-log files processing blocks
+ */
+ public final void scan() {
+ scan(false);
+ }
+
+ public final void scan(boolean skipProcessingBlocks) {
+ if (forceFullScan) {
+ // NOTE: When full-scan is enforced, scanning is invoked upfront (during initialization)
+ return;
+ }
+
+ scanInternal(Option.empty(), skipProcessingBlocks);
+ }
+
+ /**
+ * Provides incremental scanning capability where only provided keys will be looked
+ * up in the delta-log files, scanned and subsequently materialized into the internal
+ * cache
+ *
+ * @param keys to be looked up
+ */
+ public void scanByFullKeys(List<String> keys) {
+ // We can skip scanning in case reader is in full-scan mode, in which case all blocks
+ // are processed upfront (no additional scanning is necessary)
+ if (forceFullScan) {
+ return; // no-op
+ }
+
+ List<String> missingKeys = keys.stream()
+ .filter(key -> !records.containsKey(key))
+ .collect(Collectors.toList());
+
+ if (missingKeys.isEmpty()) {
+ // All the required records are already fetched, no-op
+ return;
+ }
+
+ scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);
+ }
+
+ /**
+ * Provides incremental scanning capability where only keys matching provided key-prefixes
+ * will be looked up in the delta-log files, scanned and subsequently materialized into
+ * the internal cache
+ *
+ * @param keyPrefixes to be looked up
+ */
+ public void scanByKeyPrefixes(List<String> keyPrefixes) {
+ // We can skip scanning in case reader is in full-scan mode, in which case all blocks
+ // are processed upfront (no additional scanning is necessary)
+ if (forceFullScan) {
+ return;
+ }
+
+ List<String> missingKeyPrefixes = keyPrefixes.stream()
+ .filter(keyPrefix ->
+ // NOTE: We can skip scanning the prefixes that have already
+ // been covered by the previous scans
+ scannedPrefixes.stream().noneMatch(keyPrefix::startsWith))
+ .collect(Collectors.toList());
+
+ if (missingKeyPrefixes.isEmpty()) {
+ // All the required records are already fetched, no-op
+ return;
+ }
+
+ // NOTE: When looking up by key-prefixes unfortunately we can't short-circuit
+ // and will have to scan every time as we can't know (based on just
+ // the records cached) whether particular prefix was scanned or just records
+ // matching the prefix looked up (by [[scanByFullKeys]] API)
+ scanInternal(Option.of(KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
+ scannedPrefixes.addAll(missingKeyPrefixes);
+ }
+
+ private void performScan() {
// Do the scan and merge
timer.startTimer();
- scan();
+
+ scanInternal(Option.empty(), false);
+
this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
this.numMergedRecordsInLog = records.size();
+
LOG.info("Number of log files scanned => " + logFilePaths.size());
LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
- LOG.info(
- "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
+ LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
LOG.info("Number of entries in BitCaskDiskMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
}
@Override
public Iterator<HoodieRecord> iterator() {
- checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
return records.iterator();
}
public Map<String, HoodieRecord> getRecords() {
- checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
- return records;
+ return Collections.unmodifiableMap(records);
}
public HoodieRecordType getRecordType() {
@@ -152,20 +236,20 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
@Override
protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
String key = newRecord.getRecordKey();
- if (records.containsKey(key)) {
- // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
- // done when a DELETE (empty payload) is encountered before or after an insert/update.
-
- HoodieRecord<T> oldRecord = records.get(key);
- T oldValue = oldRecord.getData();
- HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(oldRecord, readerSchema,
+ HoodieRecord<T> prevRecord = records.get(key);
+ if (prevRecord != null) {
+ // Merge and store the combined record
+ HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
- // If combinedValue is oldValue, no need rePut oldRecord
- if (combinedRecord.getData() != oldValue) {
- HoodieRecord latestHoodieRecord = combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());
+ // If pre-combine returns existing record, no need to update it
+ if (combinedRecord.getData() != prevRecord.getData()) {
+ HoodieRecord latestHoodieRecord =
+ combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());
+
latestHoodieRecord.unseal();
latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
latestHoodieRecord.seal();
+
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
@@ -215,6 +299,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
return totalTimeTakenToReadAndMergeBlocks;
}
+ @Override
public void close() {
if (records != null) {
records.close();
@@ -225,27 +310,28 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordReader.Builder {
- protected FileSystem fs;
- protected String basePath;
- protected List<String> logFilePaths;
- protected Schema readerSchema;
+ private FileSystem fs;
+ private String basePath;
+ private List<String> logFilePaths;
+ private Schema readerSchema;
private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema();
- protected String latestInstantTime;
- protected boolean readBlocksLazily;
- protected boolean reverseReader;
- protected int bufferSize;
+ private String latestInstantTime;
+ private boolean readBlocksLazily;
+ private boolean reverseReader;
+ private int bufferSize;
// specific configurations
- protected Long maxMemorySizeInBytes;
- protected String spillableMapBasePath;
- protected ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
- protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
+ private Long maxMemorySizeInBytes;
+ private String spillableMapBasePath;
+ private ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
+ private boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
// incremental filtering
- protected Option<InstantRange> instantRange = Option.empty();
- protected String partitionName;
- // auto scan default true
- private boolean autoScan = true;
+ private Option<InstantRange> instantRange = Option.empty();
+ private String partitionName;
// operation field default false
private boolean withOperationField = false;
+ private String keyFieldOverride;
+ // By default, we're doing a full-scan
+ private boolean forceFullScan = true;
// Use scanV2 method.
private boolean useScanV2 = false;
private HoodieRecordMerger recordMerger;
@@ -355,6 +441,16 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
return this;
}
+ public Builder withKeyFiledOverride(String keyFieldOverride) {
+ this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
+ return this;
+ }
+
+ public Builder withForceFullScan(boolean forceFullScan) {
+ this.forceFullScan = forceFullScan;
+ return this;
+ }
+
@Override
public HoodieMergedLogRecordScanner build() {
if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) {
@@ -365,8 +461,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange,
- diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true,
- Option.ofNullable(partitionName), internalSchema, useScanV2, recordMerger);
+ diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
+ Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), useScanV2, recordMerger);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index c5468f00512..2a7c91641e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -44,10 +44,21 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade
LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema,
boolean useScanV2, HoodieRecordMerger recordMerger) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange,
- false, true, Option.empty(), internalSchema, useScanV2, recordMerger);
+ false, true, Option.empty(), internalSchema, Option.empty(), useScanV2, recordMerger);
this.callback = callback;
}
+ /**
+ * Scans delta-log files processing blocks
+ */
+ public final void scan() {
+ scan(false);
+ }
+
+ public final void scan(boolean skipProcessingBlocks) {
+ scanInternal(Option.empty(), skipProcessingBlocks);
+ }
+
/**
* Returns the builder for {@code HoodieUnMergedLogRecordScanner}.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index f491adc6cab..3faddb91b4d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -166,7 +166,7 @@ public class CollectionUtils {
/**
* Returns difference b/w {@code one} {@link Set} of elements and {@code another}
*/
- public static <E> Set<E> diff(Collection<E> one, Collection<E> another) {
+ public static <E> Set<E> diffSet(Set<E> one, Set<E> another) {
Set<E> diff = new HashSet<>(one);
diff.removeAll(another);
return diff;
@@ -178,7 +178,7 @@ public class CollectionUtils {
* NOTE: This is less optimal counterpart to {@link #diff(Collection, Collection)}, accepting {@link List}
* as a holding collection to support duplicate elements use-cases
*/
- public static <E> List<E> diff(List<E> one, List<E> another) {
+ public static <E> List<E> diff(Collection<E> one, Collection<E> another) {
List<E> diff = new ArrayList<>(one);
diff.removeAll(another);
return diff;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 218f0d9f16e..ee930e588d0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -24,6 +24,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -51,6 +52,7 @@ import java.util.stream.Stream;
* map may occupy more memory than is available, resulting in OOM. However, if the spill threshold is too low, we spill
* frequently and incur unnecessary disk writes.
*/
+@NotThreadSafe
public class ExternalSpillableMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Serializable {
// Find the actual estimated payload size after inserting N records
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 2721795081c..26780265d03 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -46,6 +46,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
@@ -75,7 +76,6 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
@@ -99,7 +99,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private final boolean reuse;
// Readers for the latest file slice corresponding to file groups in the metadata partition
- private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader>> partitionReaders =
+ private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>> partitionReaders =
new ConcurrentHashMap<>();
public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
@@ -177,13 +177,13 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
(SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
// NOTE: Since this will be executed by executors, we can't access previously cached
// readers, and therefore have to always open new ones
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
openReaders(partitionName, fileSlice);
try {
List<Long> timings = new ArrayList<>();
HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+ HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
// TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
@@ -223,12 +223,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
AtomicInteger fileSlicesKeysCount = new AtomicInteger();
partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
try {
List<Long> timings = new ArrayList<>();
HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+ HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
return;
}
@@ -255,38 +255,35 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return result;
}
- private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+ private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
List<String> keys,
boolean fullKey,
List<Long> timings) {
HoodieTimer timer = HoodieTimer.start();
- if (logRecordScanner == null) {
+ if (logRecordReader == null) {
timings.add(timer.endTimer());
return Collections.emptyMap();
}
- String partitionName = logRecordScanner.getPartitionName().get();
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>(keys.size());
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
- if (isFullScanAllowedForPartition(partitionName)) {
- checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
- // Path which does full scan of log files
- for (String key : keys) {
- logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
- }
- } else {
- // This path will do seeks pertaining to the keys passed in
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
- fullKey ? logRecordScanner.getRecordsByKeys(keys)
- : logRecordScanner.getRecordsByKeyPrefixes(keys)
- .stream()
- .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
- .collect(Collectors.toList());
-
- for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
- logRecords.put(entry.getKey(), entry.getValue());
- }
+ // First, fetch the keys being looked up
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
+ (fullKey ? logRecordReader.getRecordsByKeys(keys) : logRecordReader.getRecordsByKeyPrefixes(keys))
+ .stream()
+ .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+ .collect(Collectors.toList());
+
+ // Second, back-fill keys not present in the log-blocks (such that map holds
+ // a record for every key being looked up)
+ List<String> missingKeys = CollectionUtils.diff(keys, logRecords.keySet());
+ for (String key : missingKeys) {
+ logRecords.put(key, Option.empty());
+ }
+
+ for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
+ logRecords.put(entry.getKey(), entry.getValue());
}
timings.add(timer.endTimer());
@@ -415,7 +412,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
* @param slice - The file slice to open readers for
* @return File reader and the record scanner pair for the requested file slice
*/
- private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
+ private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
if (reuse) {
Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
return partitionReaders.computeIfAbsent(key, ignored -> openReaders(partitionName, slice));
@@ -424,7 +421,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
}
- private Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
+ private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> openReaders(String partitionName, FileSlice slice) {
try {
HoodieTimer timer = HoodieTimer.start();
// Open base file reader
@@ -434,9 +431,9 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// Open the log record scanner using the log files from the latest file slice
List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
- Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
+ Pair<HoodieMetadataLogRecordReader, Long> logRecordScannerOpenTimePair =
getLogRecordScanner(logFiles, partitionName, Option.empty());
- HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
+ HoodieMetadataLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
@@ -489,9 +486,9 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return validInstantTimestamps;
}
- public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
- String partitionName,
- Option<Boolean> allowFullScanOverride) {
+ public Pair<HoodieMetadataLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
+ String partitionName,
+ Option<Boolean> allowFullScanOverride) {
HoodieTimer timer = HoodieTimer.start();
List<String> sortedLogFilePaths = logFiles.stream()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -510,7 +507,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- HoodieMetadataMergedLogRecordReader logRecordScanner = HoodieMetadataMergedLogRecordReader.newBuilder()
+ HoodieMetadataLogRecordReader logRecordScanner = HoodieMetadataLogRecordReader.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataBasePath)
.withLogFilePaths(sortedLogFilePaths)
@@ -522,7 +519,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.withLogBlockTimestamps(validInstantTimestamps)
- .allowFullScan(allowFullScan)
+ .enableFullScan(allowFullScan)
.withPartition(partitionName)
.withUseScanV2(metadataConfig.getUseLogRecordReaderScanV2())
.build();
@@ -588,7 +585,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
* @param partitionFileSlicePair - Partition and FileSlice
*/
private synchronized void close(Pair<String, String> partitionFileSlicePair) {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers =
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
partitionReaders.remove(partitionFileSlicePair);
closeReader(readers);
}
@@ -603,7 +600,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
partitionReaders.clear();
}
- private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataMergedLogRecordReader> readers) {
+ private void closeReader(Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers) {
if (readers != null) {
try {
if (readers.getKey() != null) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
new file mode 100644
index 00000000000..48b9d66f89b
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Metadata log-block records reading implementation, internally relying on
+ * {@link HoodieMergedLogRecordScanner} to merge corresponding Metadata Table's delta log-blocks
+ * sequence
+ */
+@ThreadSafe
+public class HoodieMetadataLogRecordReader implements Closeable {
+
+ private final HoodieMergedLogRecordScanner logRecordScanner;
+
+ private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner logRecordScanner) {
+ this.logRecordScanner = logRecordScanner;
+ }
+
+ /**
+ * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+ */
+ public static HoodieMetadataLogRecordReader.Builder newBuilder() {
+ return new HoodieMetadataLogRecordReader.Builder();
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<HoodieRecord<HoodieMetadataPayload>> getRecords() {
+ // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+ // materialized state, to make sure there's no concurrent access
+ synchronized (this) {
+ logRecordScanner.scan();
+ return logRecordScanner.getRecords().values()
+ .stream()
+ .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+ .collect(Collectors.toList());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
+ if (keyPrefixes.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+ // materialized state, to make sure there's no concurrent access
+ synchronized (this) {
+ logRecordScanner.scanByKeyPrefixes(keyPrefixes);
+ Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
+
+ Predicate<String> p = createPrefixMatchingPredicate(keyPrefixes);
+ return allRecords.entrySet()
+ .stream()
+ .filter(r -> r != null && p.test(r.getKey()))
+ .map(r -> (HoodieRecord<HoodieMetadataPayload>) r.getValue())
+ .collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * Fetches records identified by the provided list of keys in case these are present in
+ * the delta-log blocks
+ */
+ @SuppressWarnings("unchecked")
+ public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeys(List<String> keys) {
+ if (keys.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // NOTE: Locking is necessary since we're accessing [[HoodieMetadataLogRecordReader]]
+ // materialized state, to make sure there's no concurrent access
+ synchronized (this) {
+ logRecordScanner.scanByFullKeys(keys);
+ Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
+ return keys.stream()
+ .map(key -> (HoodieRecord<HoodieMetadataPayload>) allRecords.get(key))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logRecordScanner.close();
+ }
+
+ private static Predicate<String> createPrefixMatchingPredicate(List<String> keyPrefixes) {
+ if (keyPrefixes.size() == 1) {
+ String keyPrefix = keyPrefixes.get(0);
+ return key -> key.startsWith(keyPrefix);
+ }
+
+ return key -> keyPrefixes.stream().anyMatch(key::startsWith);
+ }
+
+ /**
+ * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
+ */
+ public static class Builder {
+ private final HoodieMergedLogRecordScanner.Builder scannerBuilder =
+ new HoodieMergedLogRecordScanner.Builder()
+ .withKeyFiledOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
+ // NOTE: Merging of Metadata Table's records is currently handled using {@code HoodieAvroRecordMerger}
+ // for compatibility purposes; In the future it {@code HoodieMetadataPayload} semantic
+ // will be migrated to its own custom instance of {@code RecordMerger}
+ .withRecordMerger(new HoodieAvroRecordMerger())
+ .withReadBlocksLazily(true)
+ .withReverseReader(false)
+ .withOperationField(false);
+
+ public Builder withFileSystem(FileSystem fs) {
+ scannerBuilder.withFileSystem(fs);
+ return this;
+ }
+
+ public Builder withBasePath(String basePath) {
+ scannerBuilder.withBasePath(basePath);
+ return this;
+ }
+
+ public Builder withLogFilePaths(List<String> logFilePaths) {
+ scannerBuilder.withLogFilePaths(logFilePaths);
+ return this;
+ }
+
+ public Builder withReaderSchema(Schema schema) {
+ scannerBuilder.withReaderSchema(schema);
+ return this;
+ }
+
+ public Builder withLatestInstantTime(String latestInstantTime) {
+ scannerBuilder.withLatestInstantTime(latestInstantTime);
+ return this;
+ }
+
+ public Builder withBufferSize(int bufferSize) {
+ scannerBuilder.withBufferSize(bufferSize);
+ return this;
+ }
+
+ public Builder withPartition(String partitionName) {
+ scannerBuilder.withPartition(partitionName);
+ return this;
+ }
+
+ public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+ scannerBuilder.withMaxMemorySizeInBytes(maxMemorySizeInBytes);
+ return this;
+ }
+
+ public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+ scannerBuilder.withSpillableMapBasePath(spillableMapBasePath);
+ return this;
+ }
+
+ public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
+ scannerBuilder.withDiskMapType(diskMapType);
+ return this;
+ }
+
+ public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
+ scannerBuilder.withBitCaskDiskMapCompressionEnabled(isBitCaskDiskMapCompressionEnabled);
+ return this;
+ }
+
+ public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
+ scannerBuilder.withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
+ return this;
+ }
+
+ public Builder enableFullScan(boolean enableFullScan) {
+ scannerBuilder.withForceFullScan(enableFullScan);
+ return this;
+ }
+
+ public Builder withUseScanV2(boolean useScanV2) {
+ scannerBuilder.withUseScanV2(useScanV2);
+ return this;
+ }
+
+ public HoodieMetadataLogRecordReader build() {
+ return new HoodieMetadataLogRecordReader(scannerBuilder.build());
+ }
+ }
+
+ /**
+ * Class to assist in checking if an instant is part of a set of instants.
+ */
+ private static class ExplicitMatchRange extends InstantRange {
+ Set<String> instants;
+
+ public ExplicitMatchRange(Set<String> instants) {
+ super(Collections.min(instants), Collections.max(instants));
+ this.instants = instants;
+ }
+
+ @Override
+ public boolean isInRange(String instant) {
+ return this.instants.contains(instant);
+ }
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
deleted file mode 100644
index aec9877f07d..00000000000
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.metadata;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.InstantRange;
-import org.apache.hudi.common.util.HoodieRecordUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.internal.schema.InternalSchema;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
-/**
- * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
- * useful in limiting memory usage when only a small subset of updates records are to be read.
- */
-public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordScanner {
-
- private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
-
- private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, String partitionName,
- List<String> logFilePaths,
- Schema readerSchema, String latestInstantTime,
- Long maxMemorySizeInBytes, int bufferSize,
- String spillableMapBasePath,
- ExternalSpillableMap.DiskMapType diskMapType,
- boolean isBitCaskDiskMapCompressionEnabled,
- Option<InstantRange> instantRange, boolean allowFullScan, boolean useScanV2) {
- super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, true, false, bufferSize,
- spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, false, allowFullScan,
- Option.of(partitionName), InternalSchema.getEmptyInternalSchema(), useScanV2, HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
- }
-
- /**
- * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
- */
- public static HoodieMetadataMergedLogRecordReader.Builder newBuilder() {
- return new HoodieMetadataMergedLogRecordReader.Builder();
- }
-
- /**
- * Retrieve a record given its key.
- *
- * @param key Key of the record to retrieve
- * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
- */
- public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordByKey(String key) {
- checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
- return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key))));
- }
-
- @SuppressWarnings("unchecked")
- public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
- // Following operations have to be atomic, otherwise concurrent
- // readers would race with each other and could crash when
- // processing log block records as part of scan.
- synchronized (this) {
- records.clear();
- scanInternal(Option.of(new KeySpec(keyPrefixes, false)), false);
- return records.values().stream()
- .filter(Objects::nonNull)
- .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
- .collect(Collectors.toList());
- }
- }
-
- @SuppressWarnings("unchecked")
- public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys) {
- // Following operations have to be atomic, otherwise concurrent
- // readers would race with each other and could crash when
- // processing log block records as part of scan.
- synchronized (this) {
- records.clear();
- scan(keys);
- return keys.stream()
- .map(key -> Pair.of(key, Option.ofNullable((HoodieRecord<HoodieMetadataPayload>) records.get(key))))
- .collect(Collectors.toList());
- }
- }
-
- @Override
- protected boolean getPopulateMetaFields() {
- return this.hoodieTableMetaClient.getTableConfig().populateMetaFields() && super.getPopulateMetaFields();
- }
-
- @Override
- protected String getKeyField() {
- return HoodieMetadataPayload.KEY_FIELD_NAME;
- }
-
- /**
- * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
- */
- public static class Builder extends HoodieMergedLogRecordScanner.Builder {
-
- private boolean allowFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
-
- // Use scanV2 method.
- private boolean useScanV2 = false;
-
- @Override
- public Builder withFileSystem(FileSystem fs) {
- this.fs = fs;
- return this;
- }
-
- @Override
- public Builder withBasePath(String basePath) {
- this.basePath = basePath;
- return this;
- }
-
- @Override
- public Builder withLogFilePaths(List<String> logFilePaths) {
- this.logFilePaths = logFilePaths;
- return this;
- }
-
- @Override
- public Builder withReaderSchema(Schema schema) {
- this.readerSchema = schema;
- return this;
- }
-
- @Override
- public Builder withInternalSchema(InternalSchema internalSchema) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Builder withLatestInstantTime(String latestInstantTime) {
- this.latestInstantTime = latestInstantTime;
- return this;
- }
-
- @Override
- public Builder withReadBlocksLazily(boolean readBlocksLazily) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Builder withReverseReader(boolean reverseReader) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Builder withBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- return this;
- }
-
- @Override
- public Builder withPartition(String partitionName) {
- this.partitionName = partitionName;
- return this;
- }
-
- @Override
- public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
- this.maxMemorySizeInBytes = maxMemorySizeInBytes;
- return this;
- }
-
- @Override
- public Builder withSpillableMapBasePath(String spillableMapBasePath) {
- this.spillableMapBasePath = spillableMapBasePath;
- return this;
- }
-
- @Override
- public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
- this.diskMapType = diskMapType;
- return this;
- }
-
- @Override
- public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
- this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled;
- return this;
- }
-
- public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
- withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
- return this;
- }
-
- public Builder allowFullScan(boolean enableFullScan) {
- this.allowFullScan = enableFullScan;
- return this;
- }
-
- @Override
- public Builder withUseScanV2(boolean useScanV2) {
- this.useScanV2 = useScanV2;
- return this;
- }
-
- @Override
- public HoodieMetadataMergedLogRecordReader build() {
- return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema,
- latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath,
- diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, allowFullScan, useScanV2);
- }
- }
-
- /**
- * Class to assist in checking if an instant is part of a set of instants.
- */
- private static class ExplicitMatchRange extends InstantRange {
- Set<String> instants;
-
- public ExplicitMatchRange(Set<String> instants) {
- super(Collections.min(instants), Collections.max(instants));
- this.instants = instants;
- }
-
- @Override
- public boolean isInRange(String instant) {
- return this.instants.contains(instant);
- }
- }
-}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 1d9c5f58240..66b1c25cef7 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -93,6 +93,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -107,6 +108,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@@ -622,28 +624,14 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
boolean readBlocksLazily,
boolean useScanv2)
throws IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+ // Generate 4 delta-log files w/ random records
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
- Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 400);
+
+ Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 4);
- Set<HoodieLogFile> logFiles = new HashSet<>();
- List<List<IndexedRecord>> allRecords = new ArrayList<>();
- // create 4 log files
- while (writer.getLogFile().getLogVersion() != 4) {
- logFiles.add(writer.getLogFile());
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
- List<IndexedRecord> copyOfRecords1 = records1.stream()
- .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
- allRecords.add(copyOfRecords1);
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
- writer.appendBlock(dataBlock);
- }
- writer.close();
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
// scan all log blocks (across multiple log files)
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
@@ -671,11 +659,190 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
}
- assertEquals(scannedRecords.size(), allRecords.stream().mapToLong(Collection::size).sum(),
+ assertEquals(sort(genRecords), sort(scannedRecords),
"Scanner records count should be the same as appended records");
scanner.close();
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
+ boolean isCompressionEnabled,
+ boolean readBlocksLazily,
+ boolean useScanV2)
+ throws IOException, URISyntaxException, InterruptedException {
+ // Generate 3 delta-log files w/ random records
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 300);
+
+ Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 3);
+
+ FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+
+ // scan all log blocks (across multiple log files)
+ HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(
+ logFiles.stream()
+ .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(1024L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(BUFFER_SIZE)
+ .withSpillableMapBasePath(spillableBasePath)
+ .withDiskMapType(diskMapType)
+ .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+ .withUseScanV2(useScanV2)
+ .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+ .withForceFullScan(false)
+ .build();
+
+ List<String> sampledRecordKeys = Arrays.asList(
+ "b190b1fb-392b-4ceb-932d-a72c906127c2",
+ "409e9ad3-5def-45e7-9180-ef579c1c220b",
+ "e6b31f1c-60a8-4577-acf5-7e8ea318b08b",
+ "0c477a9e-e602-4642-8e96-1cfd357b4ba0",
+ "ea076c17-32ae-4659-8caf-6ad538b4dd8d",
+ "7a943e09-3856-4874-83a1-8ee93e158f94",
+ "9cbff584-d8a4-4b05-868b-dc917d6cf841",
+ "bda0b0d8-0c56-43b0-89f9-e090d924586b",
+ "ee118fb3-69cb-4705-a8c4-88a18e8aa1b7",
+ "cb1fbe4d-06c3-4c9c-aea7-2665ffa8b205"
+ );
+
+ List<IndexedRecord> sampledRecords = genRecords.stream()
+ .filter(r -> sampledRecordKeys.contains(((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()))
+ .collect(Collectors.toList());
+
+ //
+ // Step 1: Scan by a list of keys
+ //
+
+ scanner.scanByFullKeys(sampledRecordKeys);
+
+ List<HoodieRecord> scannedHoodieRecords = new ArrayList<>();
+ List<IndexedRecord> scannedAvroRecords = new ArrayList<>();
+ for (HoodieRecord record : scanner) {
+ scannedHoodieRecords.add(record);
+ scannedAvroRecords.add((IndexedRecord)
+ ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
+ }
+
+ assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
+
+ //
+ // Step 2: Scan by the same list of keys (no new scanning should be performed,
+ // in this case, and same _objects_ have to be returned)
+ //
+
+ scanner.scanByFullKeys(sampledRecordKeys);
+
+ List<HoodieRecord> newScannedHoodieRecords = new ArrayList<>();
+ for (HoodieRecord record : scanner) {
+ newScannedHoodieRecords.add(record);
+ }
+
+ assertEquals(scannedHoodieRecords.size(), newScannedHoodieRecords.size());
+
+ for (int i = 0; i < scannedHoodieRecords.size(); ++i) {
+ assertSame(scannedHoodieRecords.get(i), newScannedHoodieRecords.get(i), "Objects have to be identical");
+ }
+
+ scanner.close();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType diskMapType,
+ boolean isCompressionEnabled,
+ boolean readBlocksLazily,
+ boolean useScanV2)
+ throws IOException, URISyntaxException, InterruptedException {
+ // Generate 3 delta-log files w/ random records
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 300);
+
+ Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 3);
+
+ FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+
+ // scan all log blocks (across multiple log files)
+ HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(
+ logFiles.stream()
+ .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(1024L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(BUFFER_SIZE)
+ .withSpillableMapBasePath(spillableBasePath)
+ .withDiskMapType(diskMapType)
+ .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+ .withUseScanV2(useScanV2)
+ .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+ .withForceFullScan(false)
+ .build();
+
+
+ List<String> sampledRecordKeys = Arrays.asList(
+ "00509b14-3d1a-4283-9a8c-c72b971a9d06",
+ "006b2f57-9bf7-4634-910c-c91542ea61e5",
+ "007fc45d-7ce2-45be-8765-0b9082412518",
+ "00826e50-73b4-4cb0-9d5a-375554d5e0f7"
+ );
+
+ List<IndexedRecord> sampledRecords = genRecords.stream()
+ .filter(r -> sampledRecordKeys.contains(((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()))
+ .collect(Collectors.toList());
+
+ List<String> sampledKeyPrefixes = Collections.singletonList("00");
+
+ //
+ // Step 1: Scan by a list of keys
+ //
+
+ scanner.scanByKeyPrefixes(sampledKeyPrefixes);
+
+ List<HoodieRecord> scannedHoodieRecords = new ArrayList<>();
+ List<IndexedRecord> scannedAvroRecords = new ArrayList<>();
+ for (HoodieRecord record : scanner) {
+ scannedHoodieRecords.add(record);
+ scannedAvroRecords.add((IndexedRecord)
+ ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
+ }
+
+ assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
+
+ //
+ // Step 2: Scan by the same list of keys (no new scanning should be performed,
+ // in this case, and same _objects_ have to be returned)
+ //
+
+ scanner.scanByKeyPrefixes(sampledKeyPrefixes);
+
+ List<HoodieRecord> newScannedHoodieRecords = new ArrayList<>();
+ for (HoodieRecord record : scanner) {
+ newScannedHoodieRecords.add(record);
+ }
+
+ assertEquals(scannedHoodieRecords.size(), newScannedHoodieRecords.size());
+
+ for (int i = 0; i < scannedHoodieRecords.size(); ++i) {
+ assertSame(scannedHoodieRecords.get(i), newScannedHoodieRecords.get(i), "Objects have to be identical");
+ }
+
+ scanner.close();
+ }
+
@Test
public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
@@ -858,8 +1025,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).withSizeThreshold(500).build();
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
@@ -870,7 +1039,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
writer.appendBlock(dataBlock);
// Write 2
- List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -926,7 +1095,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -939,7 +1109,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// Write 2
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
- List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
writer.appendBlock(dataBlock);
@@ -956,7 +1126,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// Write 3
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
- List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1011,7 +1181,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1046,7 +1217,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 3
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
- List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
@@ -1103,7 +1274,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1114,7 +1286,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// Write 2
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
- List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
@@ -1244,7 +1416,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1255,7 +1428,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// Write 2
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
- List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
@@ -1367,7 +1540,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1379,7 +1553,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
writer.appendBlock(dataBlock);
// Write 2
- List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
writer.appendBlock(dataBlock);
@@ -1457,7 +1631,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1524,7 +1699,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1584,7 +1760,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -1656,7 +1833,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1770,7 +1948,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
// Write 1st data blocks multiple times.
- List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
Set<String> recordKeysOfFirstTwoBatches = records1.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
@@ -1783,7 +1962,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
// Write 2nd data block
- List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
recordKeysOfFirstTwoBatches.addAll(records2.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
@@ -1796,7 +1975,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FileCreateUtils.createDeltaCommit(basePath, "101", fs);
// Write 3rd data block
- List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
Set<String> recordKeysOfFirstThreeBatches = new HashSet<>(recordKeysOfFirstTwoBatches);
recordKeysOfFirstThreeBatches.addAll(records3.stream()
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)
@@ -1865,7 +2044,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FileCreateUtils.createDeltaCommit(basePath, "104", fs);
// Write 6th data block
- List<IndexedRecord> records6 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records6 = testUtil.generateHoodieTestRecords(0, 100);
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "105");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1875,7 +2054,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FileCreateUtils.createDeltaCommit(basePath, "105", fs);
// Write 7th data block
- List<IndexedRecord> records7 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records7 = testUtil.generateHoodieTestRecords(0, 100);
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "106");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1885,7 +2064,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FileCreateUtils.createDeltaCommit(basePath, "106", fs);
// Write 8th data block
- List<IndexedRecord> records8 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> records8 = testUtil.generateHoodieTestRecords(0, 100);
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "107");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1964,7 +2143,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
try {
// Write one Data block with same InstantTime (written in same batch)
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
- List<IndexedRecord> records = SchemaTestUtil.generateHoodieTestRecords(0, 101);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records = testUtil.generateHoodieTestRecords(0, 101);
List<IndexedRecord> records2 = new ArrayList<>(records);
// Write1 with numRecordsInLog1 records written to log.1
@@ -2360,12 +2540,12 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
}
}
- private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
+ private static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records,
Map<HeaderMetadataType, String> header) {
return getDataBlock(dataBlockType, records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), header, new Path("dummy_path"));
}
- private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<HoodieRecord> records,
+ private static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<HoodieRecord> records,
Map<HeaderMetadataType, String> header, Path pathForReader) {
switch (dataBlockType) {
case CDC_DATA_BLOCK:
@@ -2417,6 +2597,41 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
);
}
+ private static Set<HoodieLogFile> writeLogFiles(Path partitionPath,
+ Schema schema,
+ List<IndexedRecord> records,
+ int numFiles) throws IOException, InterruptedException {
+ Writer writer =
+ HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+
+ Set<HoodieLogFile> logFiles = new HashSet<>();
+
+ // Create log files
+ int recordsPerFile = records.size() / numFiles;
+ int filesWritten = 0;
+
+ while (filesWritten < numFiles) {
+ int targetRecordsCount = filesWritten == numFiles - 1
+ ? recordsPerFile + (records.size() % recordsPerFile)
+ : recordsPerFile;
+ int offset = filesWritten * recordsPerFile;
+ List<IndexedRecord> targetRecords = records.subList(offset, offset + targetRecordsCount);
+
+ logFiles.add(writer.getLogFile());
+ writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, header));
+
+ filesWritten++;
+ }
+
+ writer.close();
+
+ return logFiles;
+ }
+
/**
* Utility to convert the given iterator to a List.
*/
@@ -2427,4 +2642,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
itr.forEachRemaining(r -> elements.add(r.getData()));
return elements;
}
+
+ private static List<IndexedRecord> sort(List<IndexedRecord> records) {
+ List<IndexedRecord> copy = new ArrayList<>(records);
+ copy.sort(Comparator.comparing(r -> ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()));
+ return copy;
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
index b6bbc34cc3d..1b4774b2160 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
@@ -42,7 +42,8 @@ public class TestHoodieRecord {
@BeforeEach
public void setUp() throws Exception {
- final List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ final List<IndexedRecord> indexedRecords = testUtil.generateHoodieTestRecords(0, 1);
final List<HoodieRecord> hoodieRecords =
indexedRecords.stream().map(r -> new HoodieAvroRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 70d5a1bb3e9..69a2881f400 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -53,10 +53,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.genPseudoRandomUUID;
+
/**
* A utility class for testing schema.
*/
@@ -64,6 +65,10 @@ public final class SchemaTestUtil {
private static final String RESOURCE_SAMPLE_DATA = "/sample.data";
+ private final Random random = new Random(0xDEED);
+
+ public SchemaTestUtil() {}
+
public static Schema getSimpleSchema() throws IOException {
return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test.avsc"));
}
@@ -131,25 +136,24 @@ public final class SchemaTestUtil {
return fs.getPath(array[1]);
}
- public static List<IndexedRecord> generateHoodieTestRecords(int from, int limit)
+ public List<IndexedRecord> generateHoodieTestRecords(int from, int limit)
throws IOException, URISyntaxException {
List<IndexedRecord> records = generateTestRecords(from, limit);
String instantTime = HoodieActiveTimeline.createNewInstantTime();
Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
return records.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, hoodieFieldsSchema)).map(p -> {
- p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, UUID.randomUUID().toString());
+ p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, genRandomUUID());
p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime);
return p;
}).collect(Collectors.toList());
-
}
- public static List<HoodieRecord> generateHoodieTestRecords(int from, int limit, Schema schema)
+ public List<HoodieRecord> generateHoodieTestRecords(int from, int limit, Schema schema)
throws IOException, URISyntaxException {
List<IndexedRecord> records = generateTestRecords(from, limit);
return records.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, schema))
- .map(p -> convertToHoodieRecords(p, UUID.randomUUID().toString(), "000/00/00")).collect(Collectors.toList());
+ .map(p -> convertToHoodieRecords(p, genRandomUUID(), "000/00/00")).collect(Collectors.toList());
}
private static HoodieRecord convertToHoodieRecords(IndexedRecord iRecord, String key, String partitionPath) {
@@ -169,11 +173,10 @@ public final class SchemaTestUtil {
}
- public static List<HoodieRecord> generateHoodieTestRecordsWithoutHoodieMetadata(int from, int limit)
+ public List<HoodieRecord> generateHoodieTestRecordsWithoutHoodieMetadata(int from, int limit)
throws IOException, URISyntaxException {
-
List<IndexedRecord> iRecords = generateTestRecords(from, limit);
- return iRecords.stream().map(r -> new HoodieAvroRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+ return iRecords.stream().map(r -> new HoodieAvroRecord<>(new HoodieKey(genRandomUUID(), "0000/00/00"),
new HoodieAvroPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
}
@@ -358,4 +361,8 @@ public final class SchemaTestUtil {
};
}
}
+
+ private String genRandomUUID() {
+ return genPseudoRandomUUID(random).toString();
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
index 9bbe4277162..ae62a718400 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
@@ -75,7 +75,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
@ValueSource(booleans = {false, true})
public void testSimpleInsert(boolean isCompressionEnabled) throws IOException, URISyntaxException {
BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
Map<String, IndexedRecord> originalRecords = iRecords.stream()
@@ -100,7 +101,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
@ValueSource(booleans = {false, true})
public void testSimpleInsertWithoutHoodieMetadata(boolean isCompressionEnabled) throws IOException, URISyntaxException {
BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
- List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
Set<String> recordKeys = new HashSet<>();
// insert generated records into the map
hoodieRecords.forEach(r -> {
@@ -126,7 +128,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
// perform some inserts
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
@@ -137,7 +140,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
// generate updates from inserts
List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys,
- SchemaTestUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
+ testUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
String newCommitTime =
((GenericRecord) updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
@@ -169,7 +172,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
Schema schema = SchemaTestUtil.getSimpleSchema();
// Test sizeEstimator without hoodie metadata fields
- List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecords(0, 1, schema);
long payloadSize =
SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), new HoodieRecordSizeEstimator(schema));
@@ -177,7 +181,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
// Test sizeEstimator with hoodie metadata fields
schema = HoodieAvroUtils.addMetadataFields(schema);
- hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+ hoodieRecords = testUtil.generateHoodieTestRecords(0, 1, schema);
payloadSize = SpillableMapUtils.computePayloadSize(hoodieRecords.remove(0), new HoodieRecordSizeEstimator(schema));
assertTrue(payloadSize > 0);
@@ -185,7 +189,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
// Test sizeEstimator without hoodie metadata fields and without schema object in the payload
schema = SchemaTestUtil.getSimpleSchema();
- List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+ List<IndexedRecord> indexedRecords = testUtil.generateHoodieTestRecords(0, 1);
hoodieRecords =
indexedRecords.stream().map(r -> new HoodieAvroRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
@@ -194,7 +198,7 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
// Test sizeEstimator with hoodie metadata fields and without schema object in the payload
final Schema simpleSchemaWithMetadata = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
- indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+ indexedRecords = testUtil.generateHoodieTestRecords(0, 1);
hoodieRecords = indexedRecords.stream()
.map(r -> new HoodieAvroRecord<>(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
new AvroBinaryTestPayload(
@@ -208,7 +212,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
@ValueSource(booleans = {false, true})
public void testPutAll(boolean isCompressionEnabled) throws IOException, URISyntaxException {
BitCaskDiskMap<String, HoodieRecord> records = new BitCaskDiskMap<>(basePath, isCompressionEnabled);
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
Map<String, HoodieRecord> recordMap = new HashMap<>();
iRecords.forEach(r -> {
String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -235,7 +240,8 @@ public class TestBitCaskDiskMap extends HoodieCommonTestHarness {
public void testSizeEstimatorPerformance() throws IOException, URISyntaxException {
// Test sizeEstimatorPerformance with simpleSchema
Schema schema = SchemaTestUtil.getSimpleSchema();
- List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1, schema);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecords(0, 1, schema);
HoodieRecordSizeEstimator sizeEstimator = new HoodieRecordSizeEstimator<>(schema);
HoodieRecord record = hoodieRecords.remove(0);
long startTime = System.currentTimeMillis();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
index e33baf1493a..4cd34dbdab1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestExternalSpillableMap.java
@@ -82,7 +82,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);
@@ -115,7 +116,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
@@ -124,7 +126,7 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
assert recordKeys.contains(rec.getRecordKey());
}
List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys,
- SchemaTestUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
+ testUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
// update records already inserted
SpillableMapTestUtils.upsertRecords(updatedRecords, records);
@@ -154,7 +156,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
// insert a bunch of records so that values spill to disk too
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
IndexedRecord inMemoryRecord = iRecords.get(0);
@@ -210,7 +213,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
failureOutputPath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
assert (recordKeys.size() == 100);
Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = records.iterator();
@@ -236,7 +240,8 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
List<String> recordKeys = new ArrayList<>();
// Ensure we spill to disk
while (records.getDiskBasedMapNumEntries() < 1) {
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
recordKeys.addAll(SpillableMapTestUtils.upsertRecords(iRecords, records));
}
@@ -288,10 +293,12 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
new ExternalSpillableMap<>(16L, basePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled); // 16B
+ SchemaTestUtil testUtil = new SchemaTestUtil();
List<String> recordKeys = new ArrayList<>();
+
// Ensure we spill to disk
while (records.getDiskBasedMapNumEntries() < 1) {
- List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
+ List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 100);
hoodieRecords.stream().forEach(r -> {
records.put(r.getRecordKey(), r);
recordKeys.add(r.getRecordKey());
@@ -354,16 +361,18 @@ public class TestExternalSpillableMap extends HoodieCommonTestHarness {
new HoodieRecordSizeEstimator(schema), diskMapType, isCompressionEnabled);
List<String> recordKeys = new ArrayList<>();
+ SchemaTestUtil testUtil = new SchemaTestUtil();
// Put a single record. Payload size estimation happens as part of this initial put.
- HoodieRecord seedRecord = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
+ HoodieRecord seedRecord = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1).get(0);
records.put(seedRecord.getRecordKey(), seedRecord);
// Remove the key immediately to make the map empty again.
records.remove(seedRecord.getRecordKey());
// Verify payload size re-estimation does not throw exception
- List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
+ SchemaTestUtil testUtilx = new SchemaTestUtil();
+ List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 250);
hoodieRecords.stream().forEach(hoodieRecord -> {
assertDoesNotThrow(() -> {
records.put(hoodieRecord.getRecordKey(), hoodieRecord);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java
index 5b71b5ec242..96a106ec901 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbBasedMap.java
@@ -50,7 +50,8 @@ public class TestRocksDbBasedMap extends HoodieCommonTestHarness {
@Test
public void testSimple() throws IOException, URISyntaxException {
RocksDbDiskMap records = new RocksDbDiskMap(basePath);
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, records);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java
index 31daaab2136..e4dd59fd0a1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDbDiskMap.java
@@ -104,7 +104,8 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
@Test
public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
- List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<HoodieRecord> hoodieRecords = testUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
Set<String> recordKeys = new HashSet<>();
// insert generated records into the map
hoodieRecords.forEach(r -> {
@@ -128,14 +129,15 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
RocksDbDiskMap rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
- List<IndexedRecord> insertedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> insertedRecords = testUtil.generateHoodieTestRecords(0, 100);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(insertedRecords, rocksDBBasedMap);
String oldCommitTime =
((GenericRecord) insertedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
// generate updates from inserts for first 50 keys / subset of keys
List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys.subList(0, 50),
- SchemaTestUtil.generateHoodieTestRecords(0, 50), HoodieActiveTimeline.createNewInstantTime());
+ testUtil.generateHoodieTestRecords(0, 50), HoodieActiveTimeline.createNewInstantTime());
String newCommitTime =
((GenericRecord) updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
@@ -161,7 +163,8 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
@Test
public void testPutAll() throws IOException, URISyntaxException {
RocksDbDiskMap<String, HoodieRecord> rocksDBBasedMap = new RocksDbDiskMap<>(basePath);
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, 100);
Map<String, HoodieRecord> recordMap = new HashMap<>();
iRecords.forEach(r -> {
String key = ((GenericRecord) r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -193,7 +196,8 @@ public class TestRocksDbDiskMap extends HoodieCommonTestHarness {
}
private List<String> setupMapWithRecords(RocksDbDiskMap rocksDBBasedMap, int numRecords) throws IOException, URISyntaxException {
- List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, numRecords);
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> iRecords = testUtil.generateHoodieTestRecords(0, numRecords);
List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
// Ensure the number of records is correct
assertEquals(rocksDBBasedMap.size(), recordKeys.size());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index d81745156a6..20ef9ded7c4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.types.StructType
import java.io.Closeable
import scala.annotation.tailrec
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.Try
/**
@@ -61,7 +62,7 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
config: Configuration)
- extends CachingIterator[InternalRow] with Closeable with AvroDeserializerSupport {
+ extends CachingIterator[InternalRow] with AvroDeserializerSupport {
protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config))
@@ -83,27 +84,27 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
protected val requiredSchemaSafeAvroProjection: SafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, avroSchema)
protected val requiredSchemaUnsafeRowProjection: UnsafeProjection = HoodieCatalystExpressionUtils.generateUnsafeProjection(logFileReaderStructType, structTypeSchema)
- // TODO: now logScanner with internalSchema support column project, we may no need projectAvroUnsafe
- private var logScanner = {
+ private val logRecords = {
val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+
scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config, internalSchema)
}
- private val logRecords = logScanner.getRecords.asScala
-
def logRecordsPairIterator(): Iterator[(String, HoodieRecord[_])] = {
logRecords.iterator
}
// NOTE: This have to stay lazy to make sure it's initialized only at the point where it's
// going to be used, since we modify `logRecords` before that and therefore can't do it any earlier
- protected lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] = logRecords.iterator.map {
+ protected lazy val logRecordsIterator: Iterator[Option[HoodieRecord[_]]] =
+ logRecords.iterator.map {
case (_, record: HoodieSparkRecord) => Option(record)
case (_, _: HoodieEmptyRecord[_]) => Option.empty
case (_, record) =>
toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, payloadProps))
- }
+
+ }
protected def removeLogRecord(key: String): Option[HoodieRecord[_]] = logRecords.remove(key)
@@ -126,15 +127,6 @@ class LogFileIterator(split: HoodieMergeOnReadFileSplit,
}
}
}
-
- override def close(): Unit =
- if (logScanner != null) {
- try {
- logScanner.close()
- } finally {
- logScanner = null
- }
- }
}
/**
@@ -261,7 +253,7 @@ object LogFileIterator {
tableState: HoodieTableState,
maxCompactionMemoryInBytes: Long,
hadoopConf: Configuration,
- internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): HoodieMergedLogRecordScanner = {
+ internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): mutable.Map[String, HoodieRecord[_]] = {
val tablePath = tableState.tablePath
val fs = FSUtils.getFs(tablePath, hadoopConf)
@@ -282,8 +274,16 @@ object LogFileIterator {
// NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level
// of indirection among MT partitions)
val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath)
- metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)))
- .getLeft
+
+ val logRecordReader =
+ metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)))
+ .getLeft
+
+ val recordList = closing(logRecordReader) {
+ logRecordReader.getRecords
+ }
+
+ mutable.HashMap(recordList.asScala.map(r => (r.getRecordKey, r)): _*)
} else {
val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
@@ -316,10 +316,21 @@ object LogFileIterator {
getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
}
- val recordMerger = HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy)
- logRecordScannerBuilder.withRecordMerger(recordMerger)
+ logRecordScannerBuilder.withRecordMerger(
+ HoodieRecordUtils.createRecordMerger(tableState.tablePath, EngineType.SPARK, tableState.recordMergerImpls.asJava, tableState.recordMergerStrategy))
+
+ val scanner = logRecordScannerBuilder.build()
+
+ closing(scanner) {
+ // NOTE: We have to copy record-map (by default immutable copy is exposed)
+ mutable.HashMap(scanner.getRecords.asScala.toSeq: _*)
+ }
+ }
+ }
- logRecordScannerBuilder.build()
+ def closing[T](c: Closeable)(f: => T): T = {
+ try { f } finally {
+ c.close()
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index 52570596e17..eaf977e82d1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -488,8 +488,9 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase {
val schema: Schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema)
val testTable: HoodieSparkWriteableTestTable = HoodieSparkWriteableTestTable.of(metaClient, schema)
- val hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema)
- val hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema)
+ val testUtil = new SchemaTestUtil
+ val hoodieRecords1 = testUtil.generateHoodieTestRecords(0, 100, schema)
+ val hoodieRecords2 = testUtil.generateHoodieTestRecords(100, 100, schema)
testTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2)