You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/04/06 16:11:15 UTC
[hudi] branch master updated: [HUDI-3760] Adding capability to fetch Metadata Records by prefix (#5208)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9e87d164b3 [HUDI-3760] Adding capability to fetch Metadata Records by prefix (#5208)
9e87d164b3 is described below
commit 9e87d164b3f39d795ef70a1f22ba57353d6e198f
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Wed Apr 6 09:11:08 2022 -0700
[HUDI-3760] Adding capability to fetch Metadata Records by prefix (#5208)
- Adding capability to fetch Metadata Records by key prefix so that Data Skipping could fetch only Column Stats
- Index records pertaining to the columns being queried by, instead of reading out whole Index.
- Fixed usages of HFileScanner in HFileReader. few code paths uses cached scanner if available. Other code paths uses its own HFileScanner w/ positional read.
Brief change log
- Rebasing ColumnStatsIndexSupport to rely on HoodieBackedTableMetadata in lieu of reading t/h Spark DS
- Adding methods enabling key-prefix lookups to HoodiFileReader, HoodieHFileReader
- Wiring key-prefix lookup t/h LogRecordScanner impls
- Cleaning up HoodieHFileReader impl
Co-authored-by: sivabalan <n....@gmail.com>
Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
.../hudi/cli/commands/ArchivedCommitsCommand.java | 4 +-
.../apache/hudi/cli/commands/ExportCommand.java | 2 +-
.../hudi/cli/commands/HoodieLogFileCommand.java | 4 +-
.../apache/hudi/client/HoodieTimelineArchiver.java | 2 +-
.../apache/hudi/io/storage/HoodieHFileWriter.java | 2 +-
.../io/storage/TestHoodieHFileReaderWriter.java | 118 +++-
.../io/storage/TestHoodieReaderWriterBase.java | 3 +-
.../org/apache/hudi/HoodieConversionUtils.scala | 8 +
.../functional/TestHoodieBackedMetadata.java | 167 ++++-
.../functional/TestHoodieBackedTableMetadata.java | 12 +-
.../hudi/testutils/HoodieClientTestUtils.java | 4 +-
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 2 +-
.../hudi/common/config/HoodieMetadataConfig.java | 4 +-
.../table/log/AbstractHoodieLogRecordReader.java | 66 +-
.../common/table/log/HoodieLogFormatReader.java | 12 +-
.../table/log/HoodieMergedLogRecordScanner.java | 21 +-
.../table/log/block/HoodieAvroDataBlock.java | 2 +-
.../common/table/log/block/HoodieDataBlock.java | 28 +-
.../table/log/block/HoodieHFileDataBlock.java | 23 +-
.../table/timeline/HoodieArchivedTimeline.java | 2 +-
.../apache/hudi/common/util/CollectionUtils.java | 13 +
.../apache/hudi/io/storage/HoodieFileReader.java | 42 +-
.../apache/hudi/io/storage/HoodieHFileReader.java | 712 ++++++++++++---------
.../apache/hudi/io/storage/HoodieOrcReader.java | 11 +-
.../hudi/io/storage/HoodieParquetReader.java | 19 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 2 +-
.../metadata/FileSystemBackedTableMetadata.java | 7 +
.../hudi/metadata/HoodieBackedTableMetadata.java | 329 +++++++---
.../HoodieMetadataMergedLogRecordReader.java | 97 ++-
.../hudi/metadata/HoodieMetadataPayload.java | 4 +-
.../apache/hudi/metadata/HoodieTableMetadata.java | 13 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 8 +-
.../main/java/org/apache/hudi/util/LazyRef.java | 64 ++
.../common/functional/TestHoodieLogFormat.java | 2 +-
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 113 +++-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 10 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 11 +-
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 12 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 9 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 6 -
.../apache/hudi/SparkHoodieTableFileIndex.scala | 2 +-
.../sql/hudi/streaming/HoodieStreamSource.scala | 1 +
.../org/apache/hudi/TestHoodieFileIndex.scala | 6 +-
.../hudi/functional/TestColumnStatsIndex.scala | 18 +-
.../hudi/functional/TestLayoutOptimization.scala | 10 +-
.../TestMetadataTableWithSparkDataSource.scala | 16 +-
46 files changed, 1356 insertions(+), 667 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 1747a59f4f..fcb273f0a7 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -81,7 +81,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordItr().forEachRemaining(readRecords::add);
+ blk.getRecordIterator().forEachRemaining(readRecords::add);
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -155,7 +155,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(readRecords::add);
}
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index 1d8d6dcd6a..fa6e15b7af 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -124,7 +124,7 @@ public class ExportCommand implements CommandMarker {
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
while (recordItr.hasNext()) {
IndexedRecord ir = recordItr.next();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 4a56858f39..8d99c410d6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -122,7 +122,7 @@ public class HoodieLogFileCommand implements CommandMarker {
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
- try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
@@ -236,7 +236,7 @@ public class HoodieLogFileCommand implements CommandMarker {
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
- try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
allRecords.add(record);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index b0d473be04..ca76e4e3bf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -339,7 +339,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- blk.getRecordItr().forEachRemaining(records::add);
+ blk.getRecordIterator().forEachRemaining(records::add);
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index be79f50334..1642eb2c42 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -107,7 +107,7 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
.withFileContext(context)
.create();
- writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
+ writer.appendFileInfo(HoodieHFileReader.SCHEMA_KEY.getBytes(), schema.toString().getBytes());
}
@Override
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
index e1f97949ef..2db8eb0204 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -18,17 +18,6 @@
package org.apache.hudi.io.storage;
-import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -39,7 +28,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -51,21 +50,25 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
-import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;
+import static org.apache.hudi.io.storage.HoodieHFileReader.SCHEMA_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -124,7 +127,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
FileSystem fs = getFilePath().getFileSystem(conf);
HFile.Reader hfileReader = HoodieHFileUtils.createHFileReader(fs, getFilePath(), new CacheConfig(conf), conf);
assertEquals(getSchemaFromResource(TestHoodieHFileReaderWriter.class, schemaPath),
- new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(KEY_SCHEMA.getBytes()))));
+ new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(SCHEMA_KEY.getBytes()))));
}
private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
@@ -142,7 +145,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc");
HoodieFileWriter<GenericRecord> writer = createWriter(avroSchema, populateMetaFields);
List<String> keys = new ArrayList<>();
- Map<String, GenericRecord> recordMap = new HashMap<>();
+ Map<String, GenericRecord> recordMap = new TreeMap<>();
for (int i = 0; i < 100; i++) {
GenericRecord record = new GenericData.Record(avroSchema);
String key = String.format("%s%04d", "key", i);
@@ -163,24 +166,30 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
Configuration conf = new Configuration();
HoodieHFileReader hoodieHFileReader = (HoodieHFileReader) createReader(conf);
- List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
- records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
+ List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
+ assertEquals(new ArrayList<>(recordMap.values()), records);
+
hoodieHFileReader.close();
for (int i = 0; i < 2; i++) {
int randomRowstoFetch = 5 + RANDOM.nextInt(10);
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
+
List<String> rowsList = new ArrayList<>(rowsToFetch);
Collections.sort(rowsList);
- hoodieHFileReader = (HoodieHFileReader) createReader(conf);
- List<Pair<String, GenericRecord>> result = hoodieHFileReader.readRecords(rowsList);
- assertEquals(result.size(), randomRowstoFetch);
+
+ List<GenericRecord> expectedRecords = rowsList.stream().map(recordMap::get).collect(Collectors.toList());
+
+ hoodieHFileReader = (HoodieHFileReader<GenericRecord>) createReader(conf);
+ List<GenericRecord> result = HoodieHFileReader.readRecords(hoodieHFileReader, rowsList);
+
+ assertEquals(expectedRecords, result);
+
result.forEach(entry -> {
- assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
if (populateMetaFields && testAvroWithMeta) {
- assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNotNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
- assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
});
hoodieHFileReader.close();
@@ -202,7 +211,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
fs.open(getFilePath()), (int) fs.getFileStatus(getFilePath()).getLen());
// Reading byte array in HFile format, without actual file path
HoodieHFileReader<GenericRecord> hfileReader =
- new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
+ new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
assertEquals(NUM_RECORDS, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -217,7 +226,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20))
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
- Iterator<GenericRecord> iterator = hfileReader.getRecordIterator(keys, avroSchema);
+ Iterator<GenericRecord> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema);
List<Integer> expectedIds =
IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20))
@@ -233,6 +242,59 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
}
}
+ @Test
+ public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
+ writeFileWithSimpleSchema();
+ HoodieHFileReader<GenericRecord> hfileReader =
+ (HoodieHFileReader<GenericRecord>) createReader(new Configuration());
+
+ Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
+
+ List<String> keyPrefixes = Collections.singletonList("key");
+ Iterator<GenericRecord> iterator =
+ hfileReader.getRecordsByKeyPrefixIterator(keyPrefixes, avroSchema);
+
+ List<GenericRecord> recordsByPrefix = toStream(iterator).collect(Collectors.toList());
+
+ List<GenericRecord> allRecords = toStream(hfileReader.getRecordIterator()).collect(Collectors.toList());
+
+ assertEquals(allRecords, recordsByPrefix);
+
+ // filter for "key1" : entries from key10 to key19 should be matched
+ List<GenericRecord> expectedKey1s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key1")).collect(Collectors.toList());
+ iterator =
+ hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1"), avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey1s, recordsByPrefix);
+
+ // exact match
+ List<GenericRecord> expectedKey25 = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key25")).collect(Collectors.toList());
+ iterator =
+ hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key25"), avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey25, recordsByPrefix);
+
+ // no match. key prefix is beyond entries in file.
+ iterator =
+ hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key99"), avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+ .collect(Collectors.toList());
+ assertEquals(Collections.emptyList(), recordsByPrefix);
+
+ // no match. but keyPrefix is in between the entries found in file.
+ iterator =
+ hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1234"), avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+ .collect(Collectors.toList());
+ assertEquals(Collections.emptyList(), recordsByPrefix);
+ }
+
@ParameterizedTest
@ValueSource(strings = {
"/hudi_0_9_hbase_1_2_3", "/hudi_0_10_hbase_1_2_3", "/hudi_0_11_hbase_2_4_9"})
@@ -253,7 +315,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
HoodieHFileReader<GenericRecord> hfileReader =
- new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
+ new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
@@ -261,7 +323,7 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase {
content = readHFileFromResources(complexHFile);
verifyHFileReader(HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
- hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
+ hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchemaWithUDT.avsc");
assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
index 19f9b93851..4617eb93a6 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -217,7 +218,7 @@ public abstract class TestHoodieReaderWriterBase {
private void verifyFilterRowKeys(HoodieFileReader<GenericRecord> hoodieReader) {
Set<String> candidateRowKeys = IntStream.range(40, NUM_RECORDS * 2)
- .mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toSet());
+ .mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toCollection(TreeSet::new));
List<String> expectedKeys = IntStream.range(40, NUM_RECORDS)
.mapToObj(i -> "key" + String.format("%02d", i)).sorted().collect(Collectors.toList());
assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
index eaaf82182a..547c6aed62 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
@@ -18,6 +18,8 @@
package org.apache.hudi
+import org.apache.hudi.common.config.TypedProperties
+
object HoodieConversionUtils {
def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
@@ -26,4 +28,10 @@ object HoodieConversionUtils {
def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
if (opt.isPresent) Some(opt.get) else None
+ def toProperties(params: Map[String, String]): TypedProperties = {
+ val props = new TypedProperties()
+ params.foreach(kv => props.setProperty(kv._1, kv._2))
+ props
+ }
+
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 35ae6da1cb..aebf046fe5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -32,6 +33,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -44,6 +46,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -69,6 +72,8 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -84,6 +89,7 @@ import org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
@@ -100,7 +106,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -736,12 +741,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
- List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
+ List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
if (populateMetaFields) {
- assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
- assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
});
}
@@ -977,12 +982,11 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
-
try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
- try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord;
if (enableMetaFields) {
@@ -1068,15 +1072,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
- List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
+ List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
if (enableMetaFields) {
- assertNotNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
- assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
- final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
+ final String keyInPayload = (String) ((GenericRecord) entry)
.get(HoodieMetadataPayload.KEY_FIELD_NAME);
assertFalse(keyInPayload.isEmpty());
});
@@ -1383,6 +1387,139 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
testTableOperationsImpl(engineContext, writeConfig);
}
+ @Test
+ public void testColStatsPrefixLookup() throws IOException {
+ this.tableType = COPY_ON_WRITE;
+ initPath();
+ initSparkContexts("TestHoodieMetadata");
+ initFileSystem();
+ fs.mkdirs(new Path(basePath));
+ initTimelineService();
+ initMetaClient(tableType);
+ initTestDataGenerator();
+ metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ // disable small file handling so that every insert goes to a new file group.
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+ .withRollbackUsingMarkers(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+ .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withMetadataIndexColumnStats(true)
+ .enableFullScan(false)
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
+
+ String firstCommit = "0000001";
+ List<HoodieRecord> records = dataGen.generateInserts(firstCommit, 20);
+
+ AtomicInteger counter = new AtomicInteger();
+ List<HoodieRecord> processedRecords = records.stream().map(entry ->
+ new HoodieAvroRecord(new HoodieKey("key1_" + counter.getAndIncrement(), entry.getPartitionPath()), (HoodieRecordPayload) entry.getData()))
+ .collect(Collectors.toList());
+
+ client.startCommitWithTime(firstCommit);
+ List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(processedRecords, 1), firstCommit).collect();
+ assertNoWriteErrors(writeStatuses);
+
+ // Write 2 (inserts)
+ String secondCommit = "0000002";
+ client.startCommitWithTime(secondCommit);
+ records = dataGen.generateInserts(secondCommit, 20);
+ AtomicInteger counter1 = new AtomicInteger();
+ processedRecords = records.stream().map(entry ->
+ new HoodieAvroRecord(new HoodieKey("key2_" + counter1.getAndIncrement(), entry.getPartitionPath()), (HoodieRecordPayload) entry.getData()))
+ .collect(Collectors.toList());
+ writeStatuses = client.insert(jsc.parallelize(processedRecords, 1), secondCommit).collect();
+ assertNoWriteErrors(writeStatuses);
+
+ Map<String, Map<String, List<String>>> commitToPartitionsToFiles = new HashMap<>();
+ // populate commit -> partition -> file info to assist in validation and prefi
+ metaClient.getActiveTimeline().getInstants().forEach(entry -> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(metaClient.getActiveTimeline().getInstantDetails(entry).get(), HoodieCommitMetadata.class);
+ String commitTime = entry.getTimestamp();
+ if (!commitToPartitionsToFiles.containsKey(commitTime)) {
+ commitToPartitionsToFiles.put(commitTime, new HashMap<>());
+ }
+ commitMetadata.getPartitionToWriteStats().entrySet()
+ .stream()
+ .forEach(partitionWriteStat -> {
+ String partitionStatName = partitionWriteStat.getKey();
+ List<HoodieWriteStat> writeStats = partitionWriteStat.getValue();
+ String partition = HoodieTableMetadataUtil.getPartition(partitionStatName);
+ if (!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
+ commitToPartitionsToFiles.get(commitTime).put(partition, new ArrayList<>());
+ }
+ writeStats.forEach(writeStat -> commitToPartitionsToFiles.get(commitTime).get(partition).add(writeStat.getPath()));
+ });
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+
+ HoodieTableMetadata tableMetadata = metadata(client);
+ // prefix search for column (_hoodie_record_key)
+ ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ List<HoodieRecord<HoodieMetadataPayload>> result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+
+ // there are 3 partitions in total and 2 commits. total entries should be 6.
+ assertEquals(result.size(), 6);
+ result.forEach(entry -> {
+ //LOG.warn("Prefix search entries just for record key col : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
+ });
+
+ // prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched
+ PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+ result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+ // 1 partition and 2 commits. total entries should be 2.
+ assertEquals(result.size(), 2);
+ result.forEach(entry -> {
+ // LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
+ HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
+ String fileName = metadataColumnStats.getFileName();
+ if (fileName.contains(firstCommit)) {
+ assertTrue(commitToPartitionsToFiles.get(firstCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+ } else {
+ assertTrue(commitToPartitionsToFiles.get(secondCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+ }
+ });
+
+ // prefix search for column {commit time} and first partition
+ columnIndexID = new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+ result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+
+ // 1 partition and 2 commits. total entries should be 2.
+ assertEquals(result.size(), 2);
+ result.forEach(entry -> {
+ // LOG.warn("Prefix search entries for record key col and first partition : " + entry.getRecordKey().toString() + " :: " + entry.getData().getColumnStatMetadata().get().toString());
+ HoodieMetadataColumnStats metadataColumnStats = entry.getData().getColumnStatMetadata().get();
+ // for commit time column, min max should be the same since we disable small files, every commit will create a new file
+ assertEquals(metadataColumnStats.getMinValue(), metadataColumnStats.getMaxValue());
+ String fileName = metadataColumnStats.getFileName();
+ if (fileName.contains(firstCommit)) {
+ assertTrue(commitToPartitionsToFiles.get(firstCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+ } else {
+ assertTrue(commitToPartitionsToFiles.get(secondCommit).get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ .contains(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "/" + fileName));
+ }
+ });
+ }
+ }
+
/**
* Test all major table operations with the given table, config and context.
*
@@ -1476,8 +1613,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"1000");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"20");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -1540,7 +1677,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -1871,7 +2008,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -2364,7 +2501,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
- try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord;
final GenericRecord colStatsRecord = (GenericRecord) record.get(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 323724a4fe..9a8fc55a20 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -21,9 +21,9 @@ package org.apache.hudi.client.functional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -51,8 +51,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -292,7 +290,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
- try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordIterator()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord;
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
@@ -361,10 +359,10 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(context.getHadoopConf().get(),
new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
- List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
+ List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
- assertNull(((GenericRecord) entry.getSecond()).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
- final String keyInPayload = (String) ((GenericRecord) entry.getSecond())
+ assertNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ final String keyInPayload = (String) ((GenericRecord) entry)
.get(HoodieMetadataPayload.KEY_FIELD_NAME);
assertFalse(keyInPayload.isEmpty());
});
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index c1f05f9c99..75d2d14221 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -67,7 +67,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;
+import static org.apache.hudi.io.storage.HoodieHFileReader.SCHEMA_KEY;
/**
* Utility methods to aid testing inside the HoodieClient module.
@@ -247,7 +247,7 @@ public class HoodieClientTestUtils {
HFile.Reader reader =
HoodieHFileUtils.createHFileReader(fs, new Path(path), cacheConfig, fs.getConf());
if (schema == null) {
- schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(KEY_SCHEMA.getBytes())));
+ schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(SCHEMA_KEY.getBytes())));
}
HFileScanner scanner = reader.getScanner(false, false);
if (!scanner.seekTo()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 8c88bfb001..ec70653b9c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -69,7 +69,7 @@ public abstract class BaseHoodieTableFileIndex {
private final String[] partitionColumns;
private final FileSystemViewStorageConfig fileSystemStorageConfig;
- private final HoodieMetadataConfig metadataConfig;
+ protected final HoodieMetadataConfig metadataConfig;
private final HoodieTableQueryType queryType;
private final Option<String> specifiedQueryInstant;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 9ff3fd57d2..c5753cc3da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -289,8 +289,8 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getString(DIR_FILTER_REGEX);
}
- public boolean enableFullScan() {
- return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
+ public boolean allowFullScan() {
+ return getBooleanOrDefault(ENABLE_FULL_SCAN_LOG_FILES);
}
public boolean populateMetaFields() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index bf5ab9fd0f..9e56083b26 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -57,7 +57,6 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
@@ -129,7 +128,7 @@ public abstract class AbstractHoodieLogRecordReader {
// Store the last instant log blocks (needed to implement rollback)
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
// Enables full scan of log records
- protected final boolean enableFullScan;
+ protected final boolean forceFullScan;
private int totalScannedLogFiles;
// Progress
private float progress = 0.0f;
@@ -150,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader {
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
- boolean withOperationField, boolean enableFullScan,
+ boolean withOperationField, boolean forceFullScan,
Option<String> partitionName, InternalSchema internalSchema) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
@@ -167,7 +166,7 @@ public abstract class AbstractHoodieLogRecordReader {
this.bufferSize = bufferSize;
this.instantRange = instantRange;
this.withOperationField = withOperationField;
- this.enableFullScan = enableFullScan;
+ this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
this.path = basePath;
@@ -189,10 +188,14 @@ public abstract class AbstractHoodieLogRecordReader {
}
public synchronized void scan() {
- scan(Option.empty());
+ scanInternal(Option.empty());
}
- public synchronized void scan(Option<List<String>> keys) {
+ public synchronized void scan(List<String> keys) {
+ scanInternal(Option.of(new KeySpec(keys, true)));
+ }
+
+ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
@@ -205,15 +208,16 @@ public abstract class AbstractHoodieLogRecordReader {
HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights();
try {
-
// Get the key field based on populate meta fields config
// and the table type
final String keyField = getKeyField();
// Iterate over the paths
+ boolean enableRecordLookups = !forceFullScan;
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
- readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan, keyField, internalSchema);
+ readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema);
+
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
@@ -250,7 +254,7 @@ public abstract class AbstractHoodieLogRecordReader {
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
- processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
+ processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store the current block
currentInstantLogBlocks.push(logBlock);
@@ -260,7 +264,7 @@ public abstract class AbstractHoodieLogRecordReader {
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
- processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
+ processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
@@ -335,7 +339,7 @@ public abstract class AbstractHoodieLogRecordReader {
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keys);
+ processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
// Done
progress = 1.0f;
@@ -370,11 +374,11 @@ public abstract class AbstractHoodieLogRecordReader {
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
* handle it.
*/
- private void processDataBlock(HoodieDataBlock dataBlock, Option<List<String>> keys) throws Exception {
- try (ClosableIterator<IndexedRecord> recordItr = dataBlock.getRecordItr(keys.orElse(Collections.emptyList()))) {
+ private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
+ try (ClosableIterator<IndexedRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
Option<Schema> schemaOption = getMergedSchema(dataBlock);
- while (recordItr.hasNext()) {
- IndexedRecord currentRecord = recordItr.next();
+ while (recordIterator.hasNext()) {
+ IndexedRecord currentRecord = recordIterator.next();
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord;
processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
@@ -449,23 +453,20 @@ public abstract class AbstractHoodieLogRecordReader {
* Process the set of log blocks belonging to the last instant which is read fully.
*/
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen,
- Option<List<String>> keys) throws Exception {
+ Option<KeySpec> keySpecOpt) throws Exception {
while (!logBlocks.isEmpty()) {
LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
- processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
+ processDataBlock((HoodieAvroDataBlock) lastBlock, keySpecOpt);
break;
case HFILE_DATA_BLOCK:
- if (!keys.isPresent()) {
- keys = Option.of(Collections.emptyList());
- }
- processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
+ processDataBlock((HoodieHFileDataBlock) lastBlock, keySpecOpt);
break;
case PARQUET_DATA_BLOCK:
- processDataBlock((HoodieParquetDataBlock) lastBlock, keys);
+ processDataBlock((HoodieParquetDataBlock) lastBlock, keySpecOpt);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
@@ -481,6 +482,15 @@ public abstract class AbstractHoodieLogRecordReader {
progress = numLogFilesSeen - 1 / logFilePaths.size();
}
+ private ClosableIterator<IndexedRecord> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
+ if (keySpecOpt.isPresent()) {
+ KeySpec keySpec = keySpecOpt.get();
+ return dataBlock.getRecordIterator(keySpec.keys, keySpec.fullKey);
+ }
+
+ return dataBlock.getRecordIterator();
+ }
+
/**
* Return progress of scanning as a float between 0.0 to 1.0.
*/
@@ -504,7 +514,7 @@ public abstract class AbstractHoodieLogRecordReader {
return payloadClassFQN;
}
- protected Option<String> getPartitionName() {
+ public Option<String> getPartitionName() {
return partitionName;
}
@@ -520,6 +530,16 @@ public abstract class AbstractHoodieLogRecordReader {
return withOperationField;
}
+ protected static class KeySpec {
+ private final List<String> keys;
+ private final boolean fullKey;
+
+ public KeySpec(List<String> keys, boolean fullKey) {
+ this.keys = keys;
+ this.fullKey = fullKey;
+ }
+ }
+
/**
* Builder used to build {@code AbstractHoodieLogRecordScanner}.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 0276c97a00..c48107e392 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -53,13 +53,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
- boolean reverseLogReader, int bufferSize, boolean enableInlineReading,
- String recordKeyField) throws IOException {
- this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, enableInlineReading, recordKeyField, InternalSchema.getEmptyInternalSchema());
- }
-
- HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
- boolean reverseLogReader, int bufferSize, boolean enableInlineReading,
+ boolean reverseLogReader, int bufferSize, boolean enableRecordLookups,
String recordKeyField, InternalSchema internalSchema) throws IOException {
this.logFiles = logFiles;
this.fs = fs;
@@ -69,12 +63,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
this.recordKeyField = recordKeyField;
- this.enableInlineReading = enableInlineReading;
+ this.enableInlineReading = enableRecordLookups;
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
- enableInlineReading, recordKeyField, internalSchema);
+ enableRecordLookups, recordKeyField, internalSchema);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index cc96f2d692..ed18736443 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -45,6 +45,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
/**
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
* be used as a lookup table when merging the base columnar file with the redo log file.
@@ -76,14 +78,14 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, String spillableMapBasePath,
- Option<InstantRange> instantRange, boolean autoScan,
+ Option<InstantRange> instantRange,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
- boolean withOperationField, boolean enableFullScan,
+ boolean withOperationField, boolean forceFullScan,
Option<String> partitionName, InternalSchema internalSchema) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
instantRange, withOperationField,
- enableFullScan, partitionName, internalSchema);
+ forceFullScan, partitionName, internalSchema);
try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
@@ -93,7 +95,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
- if (autoScan) {
+ if (forceFullScan) {
performScan();
}
}
@@ -115,10 +117,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
@Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
+ checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
return records.iterator();
}
public Map<String, HoodieRecord<? extends HoodieRecordPayload>> getRecords() {
+ checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
return records;
}
@@ -211,8 +215,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
// incremental filtering
protected Option<InstantRange> instantRange = Option.empty();
protected String partitionName;
- // auto scan default true
- private boolean autoScan = true;
// operation field default false
private boolean withOperationField = false;
@@ -290,11 +292,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
return this;
}
- public Builder withAutoScan(boolean autoScan) {
- this.autoScan = autoScan;
- return this;
- }
-
public Builder withInternalSchema(InternalSchema internalSchema) {
this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema;
return this;
@@ -315,7 +312,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
public HoodieMergedLogRecordScanner build() {
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
- bufferSize, spillableMapBasePath, instantRange, autoScan,
+ bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true,
Option.ofNullable(partitionName), internalSchema);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index a04a32bf42..491c6700c9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -314,7 +314,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
output.write(schemaContent);
List<IndexedRecord> records = new ArrayList<>();
- try (ClosableIterator<IndexedRecord> recordItr = getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> recordItr = getRecordIterator()) {
recordItr.forEachRemaining(records::add);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index dd2c03b42c..c83b3bc82d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -138,7 +138,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
/**
* Returns all the records iterator contained w/in this block.
*/
- public final ClosableIterator<IndexedRecord> getRecordItr() {
+ public final ClosableIterator<IndexedRecord> getRecordIterator() {
if (records.isPresent()) {
return list2Iterator(records.get());
}
@@ -162,21 +162,21 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
* @return List of IndexedRecords for the keys of interest.
* @throws IOException in case of failures encountered when reading/parsing records
*/
- public final ClosableIterator<IndexedRecord> getRecordItr(List<String> keys) throws IOException {
+ public final ClosableIterator<IndexedRecord> getRecordIterator(List<String> keys, boolean fullKey) throws IOException {
boolean fullScan = keys.isEmpty();
if (enablePointLookups && !fullScan) {
- return lookupRecords(keys);
+ return lookupRecords(keys, fullKey);
}
// Otherwise, we fetch all the records and filter out all the records, but the
// ones requested
- ClosableIterator<IndexedRecord> allRecords = getRecordItr();
+ ClosableIterator<IndexedRecord> allRecords = getRecordIterator();
if (fullScan) {
return allRecords;
}
HashSet<String> keySet = new HashSet<>(keys);
- return FilteringIterator.getInstance(allRecords, keySet, this::getRecordKey);
+ return FilteringIterator.getInstance(allRecords, keySet, fullKey, this::getRecordKey);
}
protected ClosableIterator<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
@@ -193,7 +193,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
}
}
- protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+ protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
throw new UnsupportedOperationException(
String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
);
@@ -252,21 +252,25 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
private final ClosableIterator<T> nested; // nested iterator
private final Set<String> keys; // the filtering keys
+ private final boolean fullKey;
+
private final Function<T, Option<String>> keyExtract; // function to extract the key
private T next;
- private FilteringIterator(ClosableIterator<T> nested, Set<String> keys, Function<T, Option<String>> keyExtract) {
+ private FilteringIterator(ClosableIterator<T> nested, Set<String> keys, boolean fullKey, Function<T, Option<String>> keyExtract) {
this.nested = nested;
this.keys = keys;
+ this.fullKey = fullKey;
this.keyExtract = keyExtract;
}
public static <T extends IndexedRecord> FilteringIterator<T> getInstance(
ClosableIterator<T> nested,
Set<String> keys,
+ boolean fullKey,
Function<T, Option<String>> keyExtract) {
- return new FilteringIterator<>(nested, keys, keyExtract);
+ return new FilteringIterator<>(nested, keys, fullKey, keyExtract);
}
@Override
@@ -278,7 +282,13 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
public boolean hasNext() {
while (this.nested.hasNext()) {
this.next = this.nested.next();
- if (keys.contains(keyExtract.apply(this.next).orElse(null))) {
+ String key = keyExtract.apply(this.next)
+ .orElseGet(() -> {
+ throw new IllegalStateException(String.format("Record without a key (%s)", this.next));
+ });
+
+ if (fullKey && keys.contains(key)
+ || !fullKey && keys.stream().anyMatch(key::startsWith)) {
return true;
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 26c9db5a15..72cb3a0ef3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log.block;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
@@ -47,6 +48,7 @@ import org.apache.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -149,6 +151,8 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
});
+ writer.appendFileInfo(HoodieHFileReader.SCHEMA_KEY.getBytes(), getSchema().toString().getBytes());
+
writer.close();
ostream.flush();
ostream.close();
@@ -163,11 +167,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ FileSystem fs = FSUtils.getFs(pathForReader.toString(), new Configuration());
// Read the content
- HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(
- FSUtils.getFs(pathForReader.toString(), new Configuration()), pathForReader, content);
- // Sets up the writer schema
- reader.withSchema(writerSchema);
+ HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(fs, pathForReader, content, Option.of(writerSchema));
Iterator<IndexedRecord> recordIterator = reader.getRecordIterator(readerSchema);
return new ClosableIterator<IndexedRecord>() {
@Override
@@ -189,7 +191,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// TODO abstract this w/in HoodieDataBlock
@Override
- protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
+ protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
// NOTE: It's important to extend Hadoop configuration here to make sure configuration
@@ -204,13 +206,18 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize());
- // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
- Collections.sort(keys);
+ // HFile read will be efficient if keys are sorted, since on storage records are sorted by key.
+ // This will avoid unnecessary seeks.
+ List<String> sortedKeys = new ArrayList<>(keys);
+ Collections.sort(sortedKeys);
final HoodieHFileReader<IndexedRecord> reader =
new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf));
+
// Get writer's schema from the header
- final ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator(keys, readerSchema);
+ final ClosableIterator<IndexedRecord> recordIterator =
+ fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
+
return new ClosableIterator<IndexedRecord>() {
@Override
public boolean hasNext() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 21c7c4db21..a9b25844ec 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -257,7 +257,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
- try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordItr()) {
+ try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordIterator()) {
StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
// Filter blocks in desired time window
.filter(r -> commitsFilter.apply((GenericRecord) r))
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
index 5673921721..9040a04d5e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java
@@ -32,9 +32,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
public class CollectionUtils {
@@ -48,6 +51,16 @@ public class CollectionUtils {
return !isNullOrEmpty(c);
}
+ /**
+ * Collects provided {@link Iterator} to a {@link Stream}
+ */
+ public static <T> Stream<T> toStream(Iterator<T> iterator) {
+ return StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
+ false
+ );
+ }
+
/**
* Combines provided arrays into one
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index cb330b8143..6490425c42 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -18,32 +18,28 @@
package org.apache.hudi.io.storage;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
-public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable {
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Set;
- public String[] readMinMaxRecordKeys();
+public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable {
- public BloomFilter readBloomFilter();
+ String[] readMinMaxRecordKeys();
- public Set<String> filterRowKeys(Set<String> candidateRowKeys);
+ BloomFilter readBloomFilter();
- default Map<String, R> getRecordsByKeys(List<String> rowKeys) throws IOException {
- throw new UnsupportedOperationException();
- }
+ Set<String> filterRowKeys(Set<String> candidateRowKeys);
- public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
+ ClosableIterator<R> getRecordIterator(Schema readerSchema) throws IOException;
- default Iterator<R> getRecordIterator() throws IOException {
+ default ClosableIterator<R> getRecordIterator() throws IOException {
return getRecordIterator(getSchema());
}
@@ -55,6 +51,22 @@ public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable
return getRecordByKey(key, getSchema());
}
+ default ClosableIterator<R> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ default ClosableIterator<R> getRecordsByKeysIterator(List<String> keys) throws IOException {
+ return getRecordsByKeysIterator(keys, getSchema());
+ }
+
+ default ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
+ throw new UnsupportedEncodingException();
+ }
+
+ default ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes) throws IOException {
+ return getRecordsByKeyPrefixIterator(keyPrefixes, getSchema());
+ }
+
Schema getSchema();
void close();
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
index 90440345f7..412b7e4a54 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
@@ -18,21 +18,10 @@
package org.apache.hudi.io.storage;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
@@ -44,97 +33,117 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.LazyRef;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
+ * <p>
+ * {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
+ */
public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileReader<R> {
- public static final String KEY_FIELD_NAME = "key";
- public static final String KEY_SCHEMA = "schema";
+
+ // TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling
+ public static final String SCHEMA_KEY = "schema";
public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
public static final String KEY_BLOOM_FILTER_TYPE_CODE = "bloomFilterTypeCode";
+
+ public static final String KEY_FIELD_NAME = "key";
public static final String KEY_MIN_RECORD = "minRecordKey";
public static final String KEY_MAX_RECORD = "maxRecordKey";
private static final Logger LOG = LogManager.getLogger(HoodieHFileReader.class);
- private Path path;
- private Configuration conf;
- private HFile.Reader reader;
- private FSDataInputStream fsDataInputStream;
- private Schema schema;
- // Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each
- // key retrieval.
- private HFileScanner keyScanner;
-
- public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig) throws IOException {
- this.conf = configuration;
- this.path = path;
- this.reader = HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
+ private final Path path;
+
+ private final LazyRef<Schema> schema;
+
+ // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in Positional Read ("pread")
+ // mode (ie created w/ "pread = true")
+ private final HFile.Reader reader;
+ // NOTE: Scanner caches read blocks, therefore it's important to re-use scanner
+ // wherever possible
+ private final HFileScanner sharedScanner;
+
+ private final Object sharedScannerLock = new Object();
+
+ public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig) throws IOException {
+ this(path,
+ HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), hadoopConf), path, cacheConfig, hadoopConf),
+ Option.empty());
}
- public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
- this.conf = configuration;
- this.path = path;
- this.fsDataInputStream = fs.open(path);
- this.reader = HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, configuration);
+ public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
+ this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, hadoopConf), Option.empty());
}
- public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content) throws IOException {
- this.reader = HoodieHFileUtils.createHFileReader(fs, dummyPath, content);
+ public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content, Option<Schema> schemaOpt) throws IOException {
+ this(null, HoodieHFileUtils.createHFileReader(fs, dummyPath, content), schemaOpt);
+ }
+
+ public HoodieHFileReader(Path path, HFile.Reader reader, Option<Schema> schemaOpt) throws IOException {
+ this.path = path;
+ this.reader = reader;
+ // For shared scanner, which is primarily used for point-lookups, we're caching blocks
+ // by default, to minimize amount of traffic to the underlying storage
+ this.sharedScanner = getHFileScanner(reader, true);
+ this.schema = schemaOpt.map(LazyRef::eager)
+ .orElseGet(() -> LazyRef.lazy(() -> fetchSchema(reader)));
}
@Override
public String[] readMinMaxRecordKeys() {
+ // NOTE: This access to reader is thread-safe
HFileInfo fileInfo = reader.getHFileInfo();
- return new String[] {new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+ return new String[]{new String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
}
- @Override
- public Schema getSchema() {
- if (schema == null) {
- HFileInfo fileInfo = reader.getHFileInfo();
- schema = new Schema.Parser().parse(new String(fileInfo.get(KEY_SCHEMA.getBytes())));
- }
-
- return schema;
- }
-
- /**
- * Sets up the writer schema explicitly.
- */
- public void withSchema(Schema schema) {
- this.schema = schema;
- }
-
@Override
public BloomFilter readBloomFilter() {
- HFileInfo fileInfo;
try {
- fileInfo = reader.getHFileInfo();
- ByteBuff serializedFilter = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader();
- byte[] filterBytes = new byte[serializedFilter.remaining()];
- serializedFilter.get(filterBytes); // read the bytes that were written
- return BloomFilterFactory.fromString(new String(filterBytes),
+ // NOTE: This access to reader is thread-safe
+ HFileInfo fileInfo = reader.getHFileInfo();
+ ByteBuff buf = reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false).getBufferWithoutHeader();
+ // We have to copy bytes here, since we can't reuse buffer's underlying
+ // array as is, since it contains additional metadata (header)
+ byte[] bytes = new byte[buf.remaining()];
+ buf.get(bytes);
+ return BloomFilterFactory.fromString(new String(bytes),
new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
} catch (IOException e) {
throw new HoodieException("Could not read bloom filter from " + path, e);
}
}
+ @Override
+ public Schema getSchema() {
+ return schema.get();
+ }
+
/**
* Filter keys by availability.
* <p>
@@ -145,289 +154,420 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
*/
@Override
public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
- return candidateRowKeys.stream().filter(k -> {
- try {
- return isKeyAvailable(k);
- } catch (IOException e) {
- LOG.error("Failed to check key availability: " + k);
- return false;
- }
- }).collect(Collectors.toSet());
- }
+ checkState(candidateRowKeys instanceof TreeSet,
+ String.format("HFile reader expects a TreeSet as iterating over ordered keys is more performant, got (%s)", candidateRowKeys.getClass().getSimpleName()));
- @Override
- public Map<String, R> getRecordsByKeys(List<String> rowKeys) throws IOException {
- return filterRecordsImpl(new TreeSet<>(rowKeys));
+ synchronized (sharedScannerLock) {
+ return candidateRowKeys.stream().filter(k -> {
+ try {
+ return isKeyAvailable(k, sharedScanner);
+ } catch (IOException e) {
+ LOG.error("Failed to check key availability: " + k);
+ return false;
+ }
+ }).collect(Collectors.toSet());
+ }
}
- /**
- * Filter records by sorted keys.
- * <p>
- * TODO: Implement single seek and sequential scan till the last candidate key
- * instead of repeated seeks.
- *
- * @param sortedCandidateRowKeys - Sorted set of keys to fetch records for
- * @return Map of keys to fetched records
- * @throws IOException When the deserialization of records fail
- */
- private synchronized Map<String, R> filterRecordsImpl(TreeSet<String> sortedCandidateRowKeys) throws IOException {
- HashMap<String, R> filteredRecords = new HashMap<>();
- for (String key : sortedCandidateRowKeys) {
- Option<R> record = getRecordByKey(key);
- if (record.isPresent()) {
- filteredRecords.put(key, record.get());
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public Option<R> getRecordByKey(String key, Schema readerSchema) throws IOException {
+ synchronized (sharedScannerLock) {
+ return (Option<R>) fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema);
}
- return filteredRecords;
}
- /**
- * Reads all the records with given schema.
- *
- * <p>NOTE: This should only be used for testing,
- * the records are materialized eagerly into a list and returned,
- * use {@code getRecordIterator} where possible.
- */
- private List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema readerSchema) {
- final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
- List<Pair<String, R>> recordList = new LinkedList<>();
- try {
- final HFileScanner scanner = reader.getScanner(false, false);
- if (scanner.seekTo()) {
- do {
- Cell c = scanner.getCell();
- final Pair<String, R> keyAndRecordPair = getRecordFromCell(c, writerSchema, readerSchema, keyFieldSchema);
- recordList.add(keyAndRecordPair);
- } while (scanner.next());
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public ClosableIterator<R> getRecordIterator(Schema readerSchema) throws IOException {
+ // TODO eval whether seeking scanner would be faster than pread
+ HFileScanner scanner = getHFileScanner(reader, false);
+ return (ClosableIterator<R>) new RecordIterator(scanner, getSchema(), readerSchema);
+ }
- return recordList;
- } catch (IOException e) {
- throw new HoodieException("Error reading hfile " + path + " as a dataframe", e);
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public ClosableIterator<R> getRecordsByKeysIterator(List<String> keys, Schema readerSchema) throws IOException {
+ // We're caching blocks for this scanner to minimize amount of traffic
+ // to the underlying storage as we fetched (potentially) sparsely distributed
+ // keys
+ HFileScanner scanner = getHFileScanner(reader, true);
+ return (ClosableIterator<R>) new RecordByKeyIterator(scanner, keys, getSchema(), readerSchema);
}
- /**
- * Reads all the records with current schema.
- *
- * <p>NOTE: This should only be used for testing,
- * the records are materialized eagerly into a list and returned,
- * use {@code getRecordIterator} where possible.
- */
- public List<Pair<String, R>> readAllRecords() {
- Schema schema = getSchema();
- return readAllRecords(schema, schema);
+ @SuppressWarnings("unchecked")
+ @Override
+ public ClosableIterator<R> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
+ // We're caching blocks for this scanner to minimize amount of traffic
+ // to the underlying storage as we fetched (potentially) sparsely distributed
+ // keys
+ HFileScanner scanner = getHFileScanner(reader, true);
+ return (ClosableIterator<R>) new RecordByKeyPrefixIterator(scanner, keyPrefixes, getSchema(), readerSchema);
}
- /**
- * Reads all the records with current schema and filtering keys.
- *
- * <p>NOTE: This should only be used for testing,
- * the records are materialized eagerly into a list and returned,
- * use {@code getRecordIterator} where possible.
- */
- public List<Pair<String, R>> readRecords(List<String> keys) throws IOException {
- return readRecords(keys, getSchema());
+ @Override
+ public long getTotalRecords() {
+ // NOTE: This access to reader is thread-safe
+ return reader.getEntries();
}
- /**
- * Reads all the records with given schema and filtering keys.
- *
- * <p>NOTE: This should only be used for testing,
- * the records are materialized eagerly into a list and returned,
- * use {@code getRecordIterator} where possible.
- */
- public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) throws IOException {
- this.schema = schema;
- List<Pair<String, R>> records = new ArrayList<>();
- for (String key: keys) {
- Option<R> value = getRecordByKey(key, schema);
- if (value.isPresent()) {
- records.add(new Pair(key, value.get()));
+ @Override
+ public void close() {
+ try {
+ synchronized (this) {
+ reader.close();
}
+ } catch (IOException e) {
+ throw new HoodieIOException("Error closing the hfile reader", e);
}
- return records;
}
- public ClosableIterator<R> getRecordIterator(List<String> keys, Schema schema) throws IOException {
- this.schema = schema;
- Iterator<String> iterator = keys.iterator();
- return new ClosableIterator<R>() {
- private R next;
- @Override
- public void close() {
+ private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws IOException {
+ final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+ return keyScanner.seekTo(kv) == 0;
+ }
+
+ private static Iterator<GenericRecord> getRecordByKeyPrefixIteratorInternal(HFileScanner scanner,
+ String keyPrefix,
+ Schema writerSchema,
+ Schema readerSchema) throws IOException {
+ KeyValue kv = new KeyValue(keyPrefix.getBytes(), null, null, null);
+
+ // NOTE: HFile persists both keys/values as bytes, therefore lexicographical sorted is
+ // essentially employed
+ //
+ // For the HFile containing list of cells c[0], c[1], ..., c[N], `seekTo(cell)` would return
+ // following:
+ // a) -1, if cell < c[0], no position;
+ // b) 0, such that c[i] = cell and scanner is left in position i;
+ // c) and 1, such that c[i] < cell, and scanner is left in position i.
+ //
+ // Consider entries w/ the following keys in HFile: [key01, key02, key03, key04,..., key20];
+ // In case looked up key-prefix is
+ // - "key", `seekTo()` will return -1 and place the cursor just before "key01",
+ // `getCell()` will return "key01" entry
+ // - "key03", `seekTo()` will return 0 (exact match) and place the cursor just before "key03",
+ // `getCell()` will return "key03" entry
+ // - "key1", `seekTo()` will return 1 (first not lower than) and place the cursor just before
+ // "key10" (i.e. on "key09");
+ //
+ int val = scanner.seekTo(kv);
+ if (val == 1) {
+ // Try moving to next entry, matching the prefix key; if we're at the EOF,
+ // `next()` will return false
+ if (!scanner.next()) {
+ return Collections.emptyIterator();
}
+ }
+
+ class KeyPrefixIterator implements Iterator<GenericRecord> {
+ private GenericRecord next = null;
+ private boolean eof = false;
@Override
public boolean hasNext() {
- try {
- while (iterator.hasNext()) {
- Option<R> value = getRecordByKey(iterator.next(), schema);
- if (value.isPresent()) {
- next = value.get();
- return true;
- }
- }
+ if (next != null) {
+ return true;
+ } else if (eof) {
+ return false;
+ }
+
+ Cell c = Objects.requireNonNull(scanner.getCell());
+ byte[] keyBytes = copyKeyFromCell(c);
+ String key = new String(keyBytes);
+ // Check whether we're still reading records corresponding to the key-prefix
+ if (!key.startsWith(keyPrefix)) {
return false;
+ }
+
+ // Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
+ byte[] valueBytes = copyValueFromCell(c);
+ try {
+ next = deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
+ // In case scanner is not able to advance, it means we reached EOF
+ eof = !scanner.next();
} catch (IOException e) {
- throw new HoodieIOException("unable to read next record from hfile ", e);
+ throw new HoodieIOException("Failed to deserialize payload", e);
}
+
+ return true;
}
@Override
- public R next() {
+ public GenericRecord next() {
+ GenericRecord next = this.next;
+ this.next = null;
return next;
}
- };
+ }
+
+ return new KeyPrefixIterator();
}
- @Override
- public Iterator getRecordIterator(Schema readerSchema) throws IOException {
- final HFileScanner scanner = reader.getScanner(false, false);
- final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
- ValidationUtils.checkState(keyFieldSchema != null,
- "Missing key field '" + KEY_FIELD_NAME + "' in the schema!");
- return new Iterator<R>() {
- private R next = null;
- private boolean eof = false;
+ private static Option<GenericRecord> fetchRecordByKeyInternal(HFileScanner scanner, String key, Schema writerSchema, Schema readerSchema) throws IOException {
+ KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+ if (scanner.seekTo(kv) != 0) {
+ return Option.empty();
+ }
- @Override
- public boolean hasNext() {
- try {
- // To handle when hasNext() is called multiple times for idempotency and/or the first time
- if (this.next == null && !this.eof) {
- if (!scanner.isSeeked() && scanner.seekTo()) {
- final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
- this.next = keyAndRecordPair.getSecond();
- }
- }
- return this.next != null;
- } catch (IOException io) {
- throw new HoodieIOException("unable to read next record from hfile ", io);
- }
- }
+ Cell c = scanner.getCell();
+ byte[] valueBytes = copyValueFromCell(c);
+ GenericRecord record = deserialize(key.getBytes(), valueBytes, writerSchema, readerSchema);
- @Override
- public R next() {
- try {
- // To handle case when next() is called before hasNext()
- if (this.next == null) {
- if (!hasNext()) {
- throw new HoodieIOException("No more records left to read from hfile");
- }
- }
- R retVal = this.next;
- if (scanner.next()) {
- final Pair<String, R> keyAndRecordPair = getRecordFromCell(scanner.getCell(), getSchema(), readerSchema, keyFieldSchema);
- this.next = keyAndRecordPair.getSecond();
- } else {
- this.next = null;
- this.eof = true;
- }
- return retVal;
- } catch (IOException io) {
- throw new HoodieIOException("unable to read next record from parquet file ", io);
- }
- }
- };
+ return Option.of(record);
}
- private boolean isKeyAvailable(String key) throws IOException {
- final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
- synchronized (this) {
- if (keyScanner == null) {
- keyScanner = reader.getScanner(false, false);
- }
- if (keyScanner.seekTo(kv) == 0) {
- return true;
- }
+ private static GenericRecord getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema) throws IOException {
+ final byte[] keyBytes = copyKeyFromCell(cell);
+ final byte[] valueBytes = copyValueFromCell(cell);
+ return deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
+ }
+
+ private static GenericRecord deserializeUnchecked(final byte[] keyBytes,
+ final byte[] valueBytes,
+ Schema writerSchema,
+ Schema readerSchema) {
+ try {
+ return deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to deserialize payload", e);
}
- return false;
}
- @Override
- public Option getRecordByKey(String key, Schema readerSchema) throws IOException {
- byte[] value = null;
- final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
- ValidationUtils.checkState(keyFieldSchema != null);
- KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+ private static GenericRecord deserialize(final byte[] keyBytes,
+ final byte[] valueBytes,
+ Schema writerSchema,
+ Schema readerSchema) throws IOException {
+ GenericRecord record = HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema);
- synchronized (this) {
- if (keyScanner == null) {
- keyScanner = reader.getScanner(false, false);
+ getKeySchema(readerSchema).ifPresent(keyFieldSchema -> {
+ final Object keyObject = record.get(keyFieldSchema.pos());
+ if (keyObject != null && keyObject.toString().isEmpty()) {
+ record.put(keyFieldSchema.pos(), new String(keyBytes));
}
+ });
- if (keyScanner.seekTo(kv) == 0) {
- Cell c = keyScanner.getCell();
- // Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
- value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
- }
- }
+ return record;
+ }
- if (value != null) {
- R record = deserialize(key.getBytes(), value, getSchema(), readerSchema, keyFieldSchema);
- return Option.of(record);
- }
+ private static Schema fetchSchema(HFile.Reader reader) {
+ HFileInfo fileInfo = reader.getHFileInfo();
+ return new Schema.Parser().parse(new String(fileInfo.get(SCHEMA_KEY.getBytes())));
+ }
- return Option.empty();
+ private static byte[] copyKeyFromCell(Cell cell) {
+ return Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
}
- private Pair<String, R> getRecordFromCell(Cell cell, Schema writerSchema, Schema readerSchema, Option<Schema.Field> keyFieldSchema) throws IOException {
- final byte[] keyBytes = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowOffset() + cell.getRowLength());
- final byte[] valueBytes = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
- R record = deserialize(keyBytes, valueBytes, writerSchema, readerSchema, keyFieldSchema);
- return new Pair<>(new String(keyBytes), record);
+ private static byte[] copyValueFromCell(Cell c) {
+ return Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
}
/**
- * Deserialize the record byte array contents to record object.
- *
- * @param keyBytes - Record key as byte array
- * @param valueBytes - Record content as byte array
- * @param writerSchema - Writer schema
- * @param readerSchema - Reader schema
- * @param keyFieldSchema - Key field id in the schema
- * @return Deserialized record object
+ * NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
+ * <p>
+ * Reads all the records with given schema
*/
- private R deserialize(final byte[] keyBytes, final byte[] valueBytes, Schema writerSchema, Schema readerSchema,
- Option<Schema.Field> keyFieldSchema) throws IOException {
- R record = (R) HoodieAvroUtils.bytesToAvro(valueBytes, writerSchema, readerSchema);
- materializeRecordIfNeeded(keyBytes, record, keyFieldSchema);
- return record;
+ public static <R extends IndexedRecord> List<R> readAllRecords(HoodieHFileReader<R> reader) throws IOException {
+ Schema schema = reader.getSchema();
+ return toStream(reader.getRecordIterator(schema))
+ .collect(Collectors.toList());
}
/**
- * Materialize the record for any missing fields, if needed.
- *
- * @param keyBytes - Key byte array
- * @param record - Record object to materialize
- * @param keyFieldSchema - Key field id in the schema
+ * NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
+ * <p>
+ * Reads all the records with given schema and filtering keys.
*/
- private void materializeRecordIfNeeded(final byte[] keyBytes, R record, Option<Schema.Field> keyFieldSchema) {
- if (keyFieldSchema.isPresent()) {
- final Object keyObject = record.get(keyFieldSchema.get().pos());
- if (keyObject != null && keyObject.toString().isEmpty()) {
- record.put(keyFieldSchema.get().pos(), new String(keyBytes));
+ public static <R extends IndexedRecord> List<R> readRecords(HoodieHFileReader<R> reader,
+ List<String> keys) throws IOException {
+ return readRecords(reader, keys, reader.getSchema());
+ }
+
+ /**
+ * NOTE: THIS SHOULD ONLY BE USED FOR TESTING, RECORDS ARE MATERIALIZED EAGERLY
+ * <p>
+ * Reads all the records with given schema and filtering keys.
+ */
+ public static <R extends IndexedRecord> List<R> readRecords(HoodieHFileReader<R> reader,
+ List<String> keys,
+ Schema schema) throws IOException {
+ Collections.sort(keys);
+ return toStream(reader.getRecordsByKeysIterator(keys, schema))
+ .collect(Collectors.toList());
+ }
+
+ private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks) {
+ // NOTE: Only scanners created in Positional Read ("pread") mode could share the same reader,
+ // since scanners in default mode will be seeking w/in the underlying stream
+ return reader.getScanner(cacheBlocks, true);
+ }
+
+ private static Option<Schema.Field> getKeySchema(Schema schema) {
+ return Option.ofNullable(schema.getField(KEY_FIELD_NAME));
+ }
+
+ private static class RecordByKeyPrefixIterator implements ClosableIterator<GenericRecord> {
+ private final Iterator<String> keyPrefixesIterator;
+ private Iterator<GenericRecord> recordsIterator;
+
+ private final HFileScanner scanner;
+
+ private final Schema writerSchema;
+ private final Schema readerSchema;
+
+ private GenericRecord next = null;
+
+ RecordByKeyPrefixIterator(HFileScanner scanner, List<String> keyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException {
+ this.keyPrefixesIterator = keyPrefixes.iterator();
+
+ this.scanner = scanner;
+ this.scanner.seekTo(); // position at the beginning of the file
+
+ this.writerSchema = writerSchema;
+ this.readerSchema = readerSchema;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ while (true) {
+ // NOTE: This is required for idempotency
+ if (next != null) {
+ return true;
+ } else if (recordsIterator != null && recordsIterator.hasNext()) {
+ next = recordsIterator.next();
+ return true;
+ } else if (keyPrefixesIterator.hasNext()) {
+ String currentKeyPrefix = keyPrefixesIterator.next();
+ recordsIterator =
+ getRecordByKeyPrefixIteratorInternal(scanner, currentKeyPrefix, writerSchema, readerSchema);
+ } else {
+ return false;
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to read next record from HFile", e);
}
}
+
+ @Override
+ public GenericRecord next() {
+ GenericRecord next = this.next;
+ this.next = null;
+ return next;
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
}
- @Override
- public long getTotalRecords() {
- return reader.getEntries();
+ private static class RecordByKeyIterator implements ClosableIterator<GenericRecord> {
+ private final Iterator<String> keyIterator;
+
+ private final HFileScanner scanner;
+
+ private final Schema readerSchema;
+ private final Schema writerSchema;
+
+ private GenericRecord next = null;
+
+ RecordByKeyIterator(HFileScanner scanner, List<String> keys, Schema writerSchema, Schema readerSchema) throws IOException {
+ this.keyIterator = keys.iterator();
+
+ this.scanner = scanner;
+ this.scanner.seekTo(); // position at the beginning of the file
+
+ this.writerSchema = writerSchema;
+ this.readerSchema = readerSchema;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ // NOTE: This is required for idempotency
+ if (next != null) {
+ return true;
+ }
+
+ while (keyIterator.hasNext()) {
+ Option<GenericRecord> value = fetchRecordByKeyInternal(scanner, keyIterator.next(), writerSchema, readerSchema);
+ if (value.isPresent()) {
+ next = value.get();
+ return true;
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ throw new HoodieIOException("unable to read next record from hfile ", e);
+ }
+ }
+
+ @Override
+ public GenericRecord next() {
+ GenericRecord next = this.next;
+ this.next = null;
+ return next;
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
}
- @Override
- public synchronized void close() {
- try {
- reader.close();
- reader = null;
- if (fsDataInputStream != null) {
- fsDataInputStream.close();
+ private static class RecordIterator implements ClosableIterator<GenericRecord> {
+ private final HFileScanner scanner;
+
+ private final Schema writerSchema;
+ private final Schema readerSchema;
+
+ private GenericRecord next = null;
+
+ RecordIterator(HFileScanner scanner, Schema writerSchema, Schema readerSchema) {
+ this.scanner = scanner;
+ this.writerSchema = writerSchema;
+ this.readerSchema = readerSchema;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ // NOTE: This is required for idempotency
+ if (next != null) {
+ return true;
+ }
+
+ boolean hasRecords;
+ if (!scanner.isSeeked()) {
+ hasRecords = scanner.seekTo();
+ } else {
+ hasRecords = scanner.next();
+ }
+
+ if (!hasRecords) {
+ return false;
+ }
+
+ this.next = getRecordFromCell(scanner.getCell(), writerSchema, readerSchema);
+ return true;
+ } catch (IOException io) {
+ throw new HoodieIOException("unable to read next record from hfile ", io);
}
- keyScanner = null;
- } catch (IOException e) {
- throw new HoodieIOException("Error closing the hfile reader", e);
+ }
+
+ @Override
+ public GenericRecord next() {
+ GenericRecord next = this.next;
+ this.next = null;
+ return next;
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
index 319f8d7da1..5431bf3782 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
@@ -18,9 +18,6 @@
package org.apache.hudi.io.storage;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -29,6 +26,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.OrcReaderIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.orc.OrcFile;
@@ -37,6 +35,9 @@ import org.apache.orc.Reader.Options;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
+import java.io.IOException;
+import java.util.Set;
+
public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReader {
private Path path;
private Configuration conf;
@@ -64,12 +65,12 @@ public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReade
}
@Override
- public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+ public ClosableIterator<R> getRecordIterator(Schema schema) throws IOException {
try {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema);
RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema));
- return new OrcReaderIterator(recordReader, schema, orcSchema);
+ return new OrcReaderIterator<>(recordReader, schema, orcSchema);
} catch (IOException io) {
throw new HoodieIOException("Unable to create an ORC reader.", io);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
index dc368a2e08..804e4354c7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
@@ -18,12 +18,6 @@
package org.apache.hudi.io.storage;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -31,12 +25,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.ParquetReaderIterator;
-
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader<R> {
private final Path path;
@@ -66,10 +65,10 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
}
@Override
- public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+ public ClosableIterator<R> getRecordIterator(Schema schema) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, schema);
ParquetReader<R> reader = AvroParquetReader.<R>builder(path).withConf(conf).build();
- ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader);
+ ParquetReaderIterator<R> parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
}
@@ -81,7 +80,7 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
@Override
public void close() {
- readerIterators.forEach(entry -> entry.close());
+ readerIterators.forEach(ParquetReaderIterator::close);
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 73b5dcb89f..2036500ac6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -378,7 +378,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key, String partitionName);
- protected abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
+ public abstract List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> key, String partitionName);
protected HoodieEngineContext getEngineContext() {
return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b4a76a7282..b77bb12c49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -21,9 +21,11 @@ package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -159,4 +161,9 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
throws HoodieMetadataException {
throw new HoodieMetadataException("Unsupported operation: getColumnsStats!");
}
+
+ @Override
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName) {
+ throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index a4bc8c5524..cf941bb70c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -18,6 +18,9 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -25,23 +28,24 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -49,10 +53,6 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -60,15 +60,22 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES;
/**
* Table metadata provided by an internal DFS backed Hudi metadata table.
@@ -77,6 +84,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class);
+ private static final Schema METADATA_RECORD_SCHEMA = HoodieMetadataRecord.getClassSchema();
+
private String metadataBasePath;
// Metadata table's timeline and metaclient
private HoodieTableMetaClient metadataMetaClient;
@@ -133,28 +142,79 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
@Override
- protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
- String partitionName) {
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
+ String partitionName) {
+ // NOTE: Since we partition records to a particular file-group by full key, we will have
+ // to scan all file-groups for all key-prefixes as each of these might contain some
+ // records matching the key-prefix
+ List<FileSlice> partitionFileSlices =
+ HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
+
+ return engineContext.parallelize(partitionFileSlices)
+ .flatMap(
+ (SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
+ // NOTE: Since this will be executed by executors, we can't access previously cached
+ // readers, and therefore have to always open new ones
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+ openReaders(partitionName, fileSlice);
+ try {
+ List<Long> timings = new ArrayList<>();
+
+ HoodieFileReader baseFileReader = readers.getKey();
+ HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+
+ if (baseFileReader == null && logRecordScanner == null) {
+ // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
+ return Collections.emptyIterator();
+ }
+
+ boolean fullKeys = false;
+
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
+ readLogRecords(logRecordScanner, keyPrefixes, fullKeys, timings);
+
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
+ readFromBaseAndMergeWithLogRecords(baseFileReader, keyPrefixes, fullKeys, logRecords, timings, partitionName);
+
+ LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
+ keyPrefixes.size(), timings));
+
+ return mergedRecords.iterator();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table for " + keyPrefixes.size() + " key : ", ioe);
+ } finally {
+ closeReader(readers);
+ }
+ }
+ )
+ .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+ .filter(Objects::nonNull);
+ }
+
+ @Override
+ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys,
+ String partitionName) {
Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = getPartitionFileSliceToKeysMapping(partitionName, keys);
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
AtomicInteger fileSlicesKeysCount = new AtomicInteger();
partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(partitionName,
- partitionFileSlicePair.getRight());
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+ getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
try {
List<Long> timings = new ArrayList<>();
HoodieFileReader baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
-
if (baseFileReader == null && logRecordScanner == null) {
return;
}
- // local map to assist in merging with base file records
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner,
- fileSliceKeys, timings);
- result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, logRecords,
+ boolean fullKeys = true;
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
+ readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
+
+ result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, fullKeys, logRecords,
timings, partitionName));
+
LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
fileSliceKeys.size(), timings));
fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
@@ -171,81 +231,127 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
- List<String> keys, List<Long> timings) {
+ List<String> keys,
+ boolean fullKey,
+ List<Long> timings) {
HoodieTimer timer = new HoodieTimer().startTimer();
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
- // Retrieve records from log file
timer.startTimer();
- if (logRecordScanner != null) {
- if (metadataConfig.enableFullScan()) {
- // path which does full scan of log files
- for (String key : keys) {
- logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
- }
- } else {
- // this path will do seeks pertaining to the keys passed in
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList = logRecordScanner.getRecordsByKeys(keys);
- for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
- logRecords.put(entry.getKey(), entry.getValue());
- }
+
+ if (logRecordScanner == null) {
+ timings.add(timer.endTimer());
+ return Collections.emptyMap();
+ }
+
+ String partitionName = logRecordScanner.getPartitionName().get();
+
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
+ if (isFullScanAllowedForPartition(partitionName)) {
+ checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
+ // Path which does full scan of log files
+ for (String key : keys) {
+ logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
}
} else {
- for (String key : keys) {
- logRecords.put(key, Option.empty());
+ // This path will do seeks pertaining to the keys passed in
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
+ fullKey ? logRecordScanner.getRecordsByKeys(keys)
+ : logRecordScanner.getRecordsByKeyPrefixes(keys)
+ .stream()
+ .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+ .collect(Collectors.toList());
+
+ for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
+ logRecords.put(entry.getKey(), entry.getValue());
}
}
+
timings.add(timer.endTimer());
return logRecords;
}
private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
- List<String> keys, Map<String,
- Option<HoodieRecord<HoodieMetadataPayload>>> logRecords, List<Long> timings, String partitionName) throws IOException {
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
- // merge with base records
+ List<String> keys,
+ boolean fullKeys,
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
+ List<Long> timings,
+ String partitionName) throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
timer.startTimer();
- HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
- // Retrieve record from base file
- if (baseFileReader != null) {
- HoodieTimer readTimer = new HoodieTimer();
- Map<String, GenericRecord> baseFileRecords = baseFileReader.getRecordsByKeys(keys);
- for (String key : keys) {
- readTimer.startTimer();
- if (baseFileRecords.containsKey(key)) {
- hoodieRecord = getRecord(Option.of(baseFileRecords.get(key)), partitionName);
- metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
- // merge base file record w/ log record if present
- if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
- HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
- result.add(Pair.of(key, Option.of(new HoodieAvroRecord(hoodieRecord.getKey(), mergedPayload))));
- } else {
- // only base record
- result.add(Pair.of(key, Option.of(hoodieRecord)));
- }
- } else {
- // only log record
- result.add(Pair.of(key, logRecords.get(key)));
- }
- }
- timings.add(timer.endTimer());
- } else {
- // no base file at all
+
+ if (baseFileReader == null) {
+ // No base file at all
timings.add(timer.endTimer());
- for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecords.entrySet()) {
- result.add(Pair.of(entry.getKey(), entry.getValue()));
+ if (fullKeys) {
+ // In case full-keys (not key-prefixes) were provided, it's expected that the list of
+ // records will contain an (optional) entry for each corresponding key
+ return keys.stream()
+ .map(key -> Pair.of(key, logRecords.getOrDefault(key, Option.empty())))
+ .collect(Collectors.toList());
+ } else {
+ return logRecords.entrySet().stream()
+ .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
}
}
- return result;
+
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
+
+ HoodieTimer readTimer = new HoodieTimer();
+ readTimer.startTimer();
+
+ Map<String, HoodieRecord<HoodieMetadataPayload>> records =
+ fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName);
+
+ metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
+
+ // Iterate over all provided log-records, merging them into existing records
+ for (Option<HoodieRecord<HoodieMetadataPayload>> logRecordOpt : logRecords.values()) {
+ if (logRecordOpt.isPresent()) {
+ HoodieRecord<HoodieMetadataPayload> logRecord = logRecordOpt.get();
+ records.merge(
+ logRecord.getRecordKey(),
+ logRecord,
+ (oldRecord, newRecord) ->
+ new HoodieAvroRecord<>(oldRecord.getKey(), newRecord.getData().preCombine(oldRecord.getData()))
+ );
+ }
+ }
+
+ timings.add(timer.endTimer());
+
+ if (fullKeys) {
+ // In case full-keys (not key-prefixes) were provided, it's expected that the list of
+ // records will contain an (optional) entry for each corresponding key
+ return keys.stream()
+ .map(key -> Pair.of(key, Option.ofNullable(records.get(key))))
+ .collect(Collectors.toList());
+ } else {
+ return records.values().stream()
+ .map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
+ .collect(Collectors.toList());
+ }
+ }
+
+ private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieFileReader<GenericRecord> baseFileReader,
+ List<String> keys,
+ boolean fullKeys,
+ String partitionName) throws IOException {
+ ClosableIterator<GenericRecord> records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys)
+ : baseFileReader.getRecordsByKeyPrefixIterator(keys);
+
+ return toStream(records)
+ .map(record -> Pair.of(
+ (String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME),
+ composeRecord(record, partitionName)))
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
- private HoodieRecord<HoodieMetadataPayload> getRecord(Option<GenericRecord> baseRecord, String partitionName) {
- ValidationUtils.checkState(baseRecord.isPresent());
+ private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) {
if (metadataTableConfig.populateMetaFields()) {
- return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+ return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false);
}
- return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+ return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()),
false, Option.of(partitionName));
@@ -279,34 +385,35 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
* Create a file reader and the record scanner for a given partition and file slice
* if readers are not already available.
*
- * @param partitionName - Partition name
- * @param slice - The file slice to open readers for
+ * @param partitionName - Partition name
+ * @param slice - The file slice to open readers for
* @return File reader and the record scanner pair for the requested file slice
*/
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String partitionName, FileSlice slice) {
- return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
- try {
- HoodieTimer timer = new HoodieTimer().startTimer();
-
- // Open base file reader
- Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
- HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
- final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
-
- // Open the log record scanner using the log files from the latest file slice
- List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
- Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
- getLogRecordScanner(logFiles, partitionName);
- HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
- final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
-
- metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
- +baseFileOpenMs + logScannerOpenMs));
- return Pair.of(baseFileReader, logRecordScanner);
- } catch (IOException e) {
- throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
- }
- });
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
+ return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> openReaders(partitionName, slice));
+ }
+
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
+ try {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ // Open base file reader
+ Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
+ HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
+ final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
+
+ // Open the log record scanner using the log files from the latest file slice
+ List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
+ Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
+ getLogRecordScanner(logFiles, partitionName);
+ HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
+ final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
+
+ metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
+ +baseFileOpenMs + logScannerOpenMs));
+ return Pair.of(baseFileReader, logRecordScanner);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
+ }
}
private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
@@ -349,7 +456,14 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return validInstantTimestamps;
}
- public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles, String partitionName) {
+ public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
+ String partitionName) {
+ return getLogRecordScanner(logFiles, partitionName, Option.empty());
+ }
+
+ public Pair<HoodieMetadataMergedLogRecordReader, Long> getLogRecordScanner(List<HoodieLogFile> logFiles,
+ String partitionName,
+ Option<Boolean> allowFullScanOverride) {
HoodieTimer timer = new HoodieTimer().startTimer();
List<String> sortedLogFilePaths = logFiles.stream()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -363,6 +477,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+ boolean allowFullScan = allowFullScanOverride.orElseGet(() -> isFullScanAllowedForPartition(partitionName));
+
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
@@ -378,7 +494,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.withLogBlockTimestamps(validInstantTimestamps)
- .enableFullScan(metadataConfig.enableFullScan())
+ .allowFullScan(allowFullScan)
.withPartition(partitionName)
.build();
@@ -388,6 +504,21 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return Pair.of(logRecordScanner, logScannerOpenMs);
}
+ // NOTE: We're allowing eager full-scan of the log-files only for "files" partition.
+ // Other partitions (like "column_stats", "bloom_filters") will have to be fetched
+ // t/h point-lookups
+ private boolean isFullScanAllowedForPartition(String partitionName) {
+ switch (partitionName) {
+ case PARTITION_NAME_FILES:
+ return metadataConfig.allowFullScan();
+
+ case PARTITION_NAME_COLUMN_STATS:
+ case PARTITION_NAME_BLOOM_FILTERS:
+ default:
+ return false;
+ }
+ }
+
/**
* Returns a list of commits which were rolled back as part of a Rollback or Restore operation.
*
@@ -433,6 +564,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private synchronized void close(Pair<String, String> partitionFileSlicePair) {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
partitionReaders.remove(partitionFileSlicePair);
+ closeReader(readers);
+ }
+
+ private void closeReader(Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers) {
if (readers != null) {
try {
if (readers.getKey() != null) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
index a024c9c3dc..d8c631a22a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
@@ -18,11 +18,13 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
@@ -31,19 +33,16 @@ import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
@@ -53,38 +52,16 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
- // Set of all record keys that are to be read in memory
- private Set<String> mergeKeyFilter;
-
private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, String partitionName,
List<String> logFilePaths,
Schema readerSchema, String latestInstantTime,
Long maxMemorySizeInBytes, int bufferSize,
- String spillableMapBasePath, Set<String> mergeKeyFilter,
+ String spillableMapBasePath,
ExternalSpillableMap.DiskMapType diskMapType,
boolean isBitCaskDiskMapCompressionEnabled,
- Option<InstantRange> instantRange, boolean enableFullScan) {
+ Option<InstantRange> instantRange, boolean allowFullScan) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
- spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false,
- enableFullScan, Option.of(partitionName), InternalSchema.getEmptyInternalSchema());
- this.mergeKeyFilter = mergeKeyFilter;
- if (enableFullScan) {
- performScan();
- }
- }
-
- @Override
- protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
- if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(hoodieRecord.getRecordKey())) {
- super.processNextRecord(hoodieRecord);
- }
- }
-
- @Override
- protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
- if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(deleteRecord.getRecordKey())) {
- super.processNextDeletedRecord(deleteRecord);
- }
+ spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, false, allowFullScan, Option.of(partitionName), InternalSchema.getEmptyInternalSchema());
}
@Override
@@ -118,24 +95,37 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
* @return {@code HoodieRecord} if key was found else {@code Option.empty()}
*/
public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordByKey(String key) {
+ checkState(forceFullScan, "Record reader has to be in full-scan mode to use this API");
return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key))));
}
+ @SuppressWarnings("unchecked")
+ public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
+ // Following operations have to be atomic, otherwise concurrent
+ // readers would race with each other and could crash when
+ // processing log block records as part of scan.
+ synchronized (this) {
+ records.clear();
+ scanInternal(Option.of(new KeySpec(keyPrefixes, false)));
+ return records.values().stream()
+ .filter(Objects::nonNull)
+ .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+ .collect(Collectors.toList());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys) {
// Following operations have to be atomic, otherwise concurrent
// readers would race with each other and could crash when
// processing log block records as part of scan.
- records.clear();
- scan(Option.of(keys));
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> metadataRecords = new ArrayList<>();
- keys.forEach(entry -> {
- if (records.containsKey(entry)) {
- metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry))));
- } else {
- metadataRecords.add(Pair.of(entry, Option.empty()));
- }
- });
- return metadataRecords;
+ synchronized (this) {
+ records.clear();
+ scan(keys);
+ return keys.stream()
+ .map(key -> Pair.of(key, Option.ofNullable((HoodieRecord<HoodieMetadataPayload>) records.get(key))))
+ .collect(Collectors.toList());
+ }
}
@Override
@@ -147,9 +137,7 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
* Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
*/
public static class Builder extends HoodieMergedLogRecordScanner.Builder {
- private Set<String> mergeKeyFilter = Collections.emptySet();
- private boolean enableFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
- private boolean enableInlineReading;
+ private boolean allowFullScan = HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.defaultValue();
@Override
public Builder withFileSystem(FileSystem fs) {
@@ -227,26 +215,21 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc
return this;
}
- public Builder withMergeKeyFilter(Set<String> mergeKeyFilter) {
- this.mergeKeyFilter = mergeKeyFilter;
- return this;
- }
-
public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
return this;
}
- public Builder enableFullScan(boolean enableFullScan) {
- this.enableFullScan = enableFullScan;
+ public Builder allowFullScan(boolean enableFullScan) {
+ this.allowFullScan = enableFullScan;
return this;
}
@Override
public HoodieMetadataMergedLogRecordReader build() {
return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema,
- latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter,
- diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableFullScan);
+ latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath,
+ diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, allowFullScan);
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 01618c5f37..1b33de795b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -75,8 +75,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.TypeUtils.unsafeCast;
-import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
+import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
@@ -391,7 +391,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
}
@Override
- public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
+ public Option<IndexedRecord> getInsertValue(Schema schemaIgnored, Properties propertiesIgnored) throws IOException {
if (key == null) {
return Option.empty();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index e206072866..a059b57845 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -24,7 +24,9 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
@@ -159,6 +161,17 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName)
throws HoodieMetadataException;
+ /**
+ * Fetch records by key prefixes. Key prefix passed is expected to match the same prefix as stored in Metadata table partitions. For eg, in case of col stats partition,
+ * actual keys in metadata partition is encoded values of column name, partition name and file name. So, key prefixes passed to this method is expected to be encoded already.
+ *
+ * @param keyPrefixes list of key prefixes for which interested records are looked up for.
+ * @param partitionName partition name in metadata table where the records are looked up for.
+ * @return {@link HoodieData} of {@link HoodieRecord}s with records matching the passed in key prefixes.
+ */
+ HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
+ String partitionName);
+
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b6d9224b34..21337ceaeb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -105,9 +105,9 @@ public class HoodieTableMetadataUtil {
private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class);
- protected static final String PARTITION_NAME_FILES = "files";
- protected static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
- protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
+ public static final String PARTITION_NAME_FILES = "files";
+ public static final String PARTITION_NAME_COLUMN_STATS = "column_stats";
+ public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
/**
* Collects {@link HoodieColumnRangeMetadata} for the provided collection of records, pretending
@@ -815,7 +815,7 @@ public class HoodieTableMetadataUtil {
* @param path
* @return
*/
- static String getPartition(@Nonnull String path) {
+ public static String getPartition(@Nonnull String path) {
return EMPTY_PARTITION_NAME.equals(path) ? NON_PARTITIONED_NAME : path;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java b/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java
new file mode 100644
index 0000000000..e4c4f881fc
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/util/LazyRef.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import java.util.function.Supplier;
+
+// TODO java-doc
+public class LazyRef<T> {
+
+ private volatile boolean initialized;
+
+ private Supplier<T> initializer;
+ private T ref;
+
+ private LazyRef(Supplier<T> initializer) {
+ this.initializer = initializer;
+ this.ref = null;
+ this.initialized = false;
+ }
+
+ private LazyRef(T ref) {
+ this.initializer = null;
+ this.ref = ref;
+ this.initialized = true;
+ }
+
+ public T get() {
+ if (!initialized) {
+ synchronized (this) {
+ if (!initialized) {
+ this.ref = initializer.get();
+ this.initializer = null;
+ initialized = true;
+ }
+ }
+ }
+
+ return ref;
+ }
+
+ public static <T> LazyRef<T> lazy(Supplier<T> initializer) {
+ return new LazyRef<>(initializer);
+ }
+
+ public static <T> LazyRef<T> eager(T ref) {
+ return new LazyRef<>(ref);
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 71917f9f56..4fa53bb41f 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -2038,7 +2038,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
* Utility to convert the given iterator to a List.
*/
private static List<IndexedRecord> getRecords(HoodieDataBlock dataBlock) {
- ClosableIterator<IndexedRecord> itr = dataBlock.getRecordItr();
+ ClosableIterator<IndexedRecord> itr = dataBlock.getRecordIterator();
List<IndexedRecord> elements = new ArrayList<>();
itr.forEachRemaining(elements::add);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index ea4417033a..d176a3755f 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -17,22 +17,39 @@
package org.apache.hudi
-import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, tryUnpackNonNullVal}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, MetadataPartitionType}
+import org.apache.avro.Schema.Parser
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, metadataRecordSchemaString, metadataRecordStructType, tryUnpackNonNullVal}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
+import org.apache.hudi.common.util.hash.ColumnIndexID
+import org.apache.hudi.data.HoodieJavaRDD
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row, SparkSession}
+import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
/**
* Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
* providing convenient interfaces to read it, transpose, etc
*/
-trait ColumnStatsIndexSupport {
+trait ColumnStatsIndexSupport extends SparkAdapterSupport {
- def readColumnStatsIndex(spark: SparkSession, metadataTablePath: String): DataFrame = {
+ def readColumnStatsIndex(spark: SparkSession,
+ tableBasePath: String,
+ metadataConfig: HoodieMetadataConfig,
+ targetColumns: Seq[String] = Seq.empty): DataFrame = {
val targetColStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
@@ -43,11 +60,17 @@ trait ColumnStatsIndexSupport {
(targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
- // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
- val metadataTableDF = spark.read.format("org.apache.hudi")
- .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+ val metadataTableDF: DataFrame = {
+ // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
+ // by only fetching Column Stats Index records pertaining to the requested columns.
+ // Otherwise we fallback to read whole Column Stats Index
+ if (targetColumns.nonEmpty) {
+ readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath)
+ } else {
+ readFullColumnStatsIndexInternal(spark, tableBasePath)
+ }
+ }
- // TODO filter on (column, partition) prefix
val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredMetadataIndexColumns.map(col): _*)
@@ -105,34 +128,40 @@ trait ColumnStatsIndexSupport {
// of the transposed table
val sortedColumns = TreeSet(targetColumns: _*)
+ val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
+ val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
+ val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
+ val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
val transposedRDD = colStatsDF.rdd
- .filter(row => sortedColumns.contains(row.getString(colStatsSchemaOrdinalsMap("columnName"))))
+ .filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
.map { row =>
- val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("minValue")))
- val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("maxValue")))
+ val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
+ val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))
- val colName = row.getString(colStatsSchemaOrdinalsMap("columnName"))
+ val colName = row.getString(colNameOrdinal)
val colType = tableSchemaFieldMap(colName).dataType
val rowValsSeq = row.toSeq.toArray
- rowValsSeq(colStatsSchemaOrdinalsMap("minValue")) = deserialize(minValue, colType)
- rowValsSeq(colStatsSchemaOrdinalsMap("maxValue")) = deserialize(maxValue, colType)
+ rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
+ rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
Row(rowValsSeq:_*)
}
- .groupBy(r => r.getString(colStatsSchemaOrdinalsMap("fileName")))
+ .groupBy(r => r.getString(fileNameOrdinal))
.foldByKey(Seq[Row]()) {
case (_, columnRows) =>
// Rows seq is always non-empty (otherwise it won't be grouped into)
- val fileName = columnRows.head.get(colStatsSchemaOrdinalsMap("fileName"))
+ val fileName = columnRows.head.get(fileNameOrdinal)
val coalescedRowValuesSeq = columnRows.toSeq
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout
- .sortBy(_.getString(colStatsSchemaOrdinalsMap("columnName")))
+ .sortBy(_.getString(colNameOrdinal))
.foldLeft(Seq[Any](fileName)) {
case (acc, columnRow) =>
- acc ++ Seq("minValue", "maxValue", "nullCount").map(ord => columnRow.get(colStatsSchemaOrdinalsMap(ord)))
+ acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord))
}
Seq(Row(coalescedRowValuesSeq:_*))
@@ -147,6 +176,49 @@ trait ColumnStatsIndexSupport {
spark.createDataFrame(transposedRDD, indexSchema)
}
+
+ private def readFullColumnStatsIndexInternal(spark: SparkSession, tableBasePath: String) = {
+ val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
+ // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+ spark.read.format("org.apache.hudi")
+ .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+ }
+
+ private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
+ val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
+
+ // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
+ // - Fetching the records from CSI by key-prefixes (encoded column names)
+ // - Deserializing fetched records into [[InternalRow]]s
+ // - Composing [[DataFrame]]
+ val metadataTableDF = {
+ val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
+
+ // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+ val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
+
+ val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
+ HoodieJavaRDD.getJavaRDD(
+ metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+ )
+
+ val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
+ val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
+ val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
+
+ it.map { record =>
+ // schema and props are ignored for generating metadata record from the payload
+ // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
+ toScalaOption(record.getData.getInsertValue(null, null))
+ .flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
+ .orNull
+ }
+ }
+
+ HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
+ }
+ metadataTableDF
+ }
}
object ColumnStatsIndexSupport {
@@ -156,6 +228,9 @@ object ColumnStatsIndexSupport {
private val COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"
private val COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"
+ private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
+ private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
+
/**
* @VisibleForTesting
*/
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f79ba96d89..c33c6dce6d 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.getPartitionPath
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -68,7 +68,8 @@ case class HoodieTableState(tablePath: String,
recordKeyField: String,
preCombineFieldOpt: Option[String],
usesVirtualKeys: Boolean,
- recordPayloadClassName: String)
+ recordPayloadClassName: String,
+ metadataConfig: HoodieMetadataConfig)
/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]].
@@ -135,7 +136,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val internalSchemaFromMeta = try {
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
- case _ => InternalSchema.getEmptyInternalSchema
+ case _: Exception => InternalSchema.getEmptyInternalSchema
}
(avroSchema, internalSchemaFromMeta)
}
@@ -339,7 +340,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
recordKeyField = recordKeyField,
preCombineFieldOpt = preCombineFieldOpt,
usesVirtualKeys = !tableConfig.populateMetaFields(),
- recordPayloadClassName = tableConfig.getPayloadClass
+ recordPayloadClassName = tableConfig.getPayloadClass,
+ metadataConfig = fileIndex.metadataConfig
)
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 82cd1f4019..2cea67d275 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
@@ -195,15 +195,14 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
- val fs = metaClient.getFs
- val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
-
- if (!isDataSkippingEnabled || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
+ if (!isDataSkippingEnabled || queryFilters.isEmpty || !HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
+ .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
Option.empty
} else {
- val colStatsDF: DataFrame = readColumnStatsIndex(spark, metadataTablePath)
val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
+ val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns)
+
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(colStatsDF) {
val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index d40cf49434..c0c47cff42 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,7 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -324,17 +324,23 @@ private object HoodieMergeOnReadRDD {
val fs = FSUtils.getFs(tablePath, hadoopConf)
if (HoodieTableMetadata.isMetadataTable(tablePath)) {
- val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
+ val metadataConfig = tableState.metadataConfig
val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
val metadataTable = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(hadoopConf), metadataConfig,
dataTableBasePath,
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ // We have to force full-scan for the MT log record reader, to make sure
+ // we can iterate over all of the partitions, since by default some of the partitions (Column Stats,
+ // Bloom Filter) are in "point-lookup" mode
+ val forceFullScan = true
+
// NOTE: In case of Metadata Table partition path equates to partition name (since there's just one level
// of indirection among MT partitions)
val relativePartitionPath = getRelativePartitionPath(new Path(tablePath), partitionPath)
- metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath).getLeft
+ metadataTable.getLogRecordScanner(logFiles.asJava, relativePartitionPath, toJavaOption(Some(forceFullScan)))
+ .getLeft
} else {
val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f1948d80c8..278454dbc3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
@@ -360,7 +361,7 @@ object HoodieSparkSqlWriter {
None
}
} catch {
- case _ => None
+ case _: Exception => None
}
}
@@ -568,12 +569,6 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}
- def toProperties(params: Map[String, String]): TypedProperties = {
- val props = new TypedProperties()
- params.foreach(kv => props.setProperty(kv._1, kv._2))
- props
- }
-
private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 81199dbca9..d9fe010fe7 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -91,12 +91,6 @@ object HoodieWriterUtils {
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}
- def toProperties(params: Map[String, String]): TypedProperties = {
- val props = new TypedProperties()
- params.foreach(kv => props.setProperty(kv._1, kv._2))
- props
- }
-
/**
* Get the partition columns to stored to hoodie.properties.
* @param parameters
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index a06ffffe50..1305323bd1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -308,7 +308,7 @@ object SparkHoodieTableFileIndex {
}
private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = {
- configProperties.asScala(QUERY_TYPE.key) match {
+ configProperties.asScala.getOrElse(QUERY_TYPE.key, QUERY_TYPE.defaultValue) match {
case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT
case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.INCREMENTAL
case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.READ_OPTIMIZED
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index 4e46233c35..5a2b30fae1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -154,6 +154,7 @@ class HoodieStreamSource(
} else {
// Consume the data between (startCommitTime, endCommitTime]
val incParams = parameters ++ Map(
+ DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startCommitTime(startOffset),
DataSourceReadOptions.END_INSTANTTIME.key -> endOffset.commitTime
)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index a11c2f73f9..3090500941 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -372,7 +372,11 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
- DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
+ // NOTE: Metadata Table has to be enabled on the read path as well
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true"
)
val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 5d10a1d1f4..8f827f13d2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -22,11 +22,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
-import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
@@ -34,6 +34,8 @@ import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -69,8 +71,9 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
cleanupSparkContexts()
}
- @Test
- def testMetadataColumnStatsIndex(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = {
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -80,6 +83,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
+ HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> forceFullLogScan.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)
@@ -104,9 +108,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
- val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
+ val metadataConfig = HoodieMetadataConfig.newBuilder()
+ .fromProperties(toProperties(opts))
+ .build()
- val colStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+ val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
@@ -146,7 +152,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
- val updatedColStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+ val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
index 188ba6745c..17715627fe 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala
@@ -51,17 +51,20 @@ class TestLayoutOptimization extends HoodieClientTestBase {
.add("c7", BinaryType)
.add("c8", ByteType)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
- HoodieMetadataConfig.ENABLE.key -> "true",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
- )
+ ) ++ metadataOpts
@BeforeEach
override def setUp() {
@@ -134,6 +137,7 @@ class TestLayoutOptimization extends HoodieClientTestBase {
val readDfSkip =
spark.read
.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
+ .options(metadataOpts)
.format("hudi")
.load(basePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index ead6358f23..d648471351 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -49,19 +49,21 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
def testReadability(): Unit = {
val dataGen = new HoodieTestDataGenerator()
- val opts: Map[String, String] = commonOpts ++ Map(
+ val metadataOpts: Map[String, String] = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
- HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true"
)
+ val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++
+ Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1")
+
// Insert records
val newRecords = dataGen.generateInserts("001", 100)
val newRecordsDF = parseRecords(recordsToStrings(newRecords).asScala)
newRecordsDF.write.format(hudi)
- .options(opts)
+ .options(combinedOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
@@ -71,13 +73,13 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
val updatedRecordsDF = parseRecords(recordsToStrings(updatedRecords).asScala)
updatedRecordsDF.write.format(hudi)
- .options(opts)
+ .options(combinedOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
// Files partition of MT
- val filesPartitionDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
+ val filesPartitionDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/files")
// Smoke test
filesPartitionDF.show()
@@ -95,7 +97,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
assertEquals(expectedKeys, keys)
// Column Stats Index partition of MT
- val colStatsDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
+ val colStatsDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
// Smoke test
colStatsDF.show()