You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/10/07 10:37:35 UTC
[hudi] branch master updated: [HUDI-4992] Fixing invalid min/max record key stats in Parquet metadata (#6883)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a51181726c [HUDI-4992] Fixing invalid min/max record key stats in Parquet metadata (#6883)
a51181726c is described below
commit a51181726ce6efb57459285a66868e9d3687bd60
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Fri Oct 7 03:37:26 2022 -0700
[HUDI-4992] Fixing invalid min/max record key stats in Parquet metadata (#6883)
---
.../apache/hudi/io/storage/HoodieOrcWriter.java | 10 +-
.../hudi/avro/TestHoodieAvroParquetWriter.java | 118 +++++++++++++++++++++
.../hudi/io/storage/TestHoodieOrcReaderWriter.java | 7 +-
.../row/HoodieRowDataParquetWriteSupport.java | 55 ++++------
.../storage/row/HoodieRowParquetWriteSupport.java | 61 +++++------
.../row/TestHoodieInternalRowParquetWriter.java | 95 ++++++++++-------
.../apache/hudi/avro/HoodieAvroWriteSupport.java | 60 +++++------
.../hudi/avro/HoodieBloomFilterWriteSupport.java | 96 +++++++++++++++++
.../org/apache/hudi/common/util/BaseFileUtils.java | 13 +--
.../hudi/avro/TestHoodieAvroWriteSupport.java | 67 ------------
10 files changed, 361 insertions(+), 221 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
index a532ac66c9..4bcab2cec8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
@@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -44,9 +45,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
implements HoodieFileWriter<R>, Closeable {
@@ -155,11 +153,11 @@ public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRec
final BloomFilter bloomFilter = orcConfig.getBloomFilter();
writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()));
if (minRecordKey != null && maxRecordKey != null) {
- writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
- writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
+ writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
+ writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
- writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
+ writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
}
}
writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes()));
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
new file mode 100644
index 0000000000..df879dc816
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.DummyTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieAvroParquetWriter {
+
+ @TempDir java.nio.file.Path tmpDir;
+
+ @Test
+ public void testProperWriting() throws IOException {
+ Configuration hadoopConf = new Configuration();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+ List<GenericRecord> records = dataGen.generateGenericRecords(10);
+
+ Schema schema = records.get(0).getSchema();
+
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000,
+ BloomFilterTypeCode.DYNAMIC_V0.name());
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema),
+ schema, Option.of(filter));
+
+ HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
+ new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1);
+
+ Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString());
+
+ try (HoodieAvroParquetWriter<GenericRecord> writer =
+ new HoodieAvroParquetWriter<>(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) {
+ for (GenericRecord record : records) {
+ writer.writeAvro((String) record.get("_row_key"), record);
+ }
+ }
+
+ ParquetUtils utils = new ParquetUtils();
+
+ // Step 1: Make sure records are written appropriately
+ List<GenericRecord> readRecords = utils.readAvroRecords(hadoopConf, filePath);
+
+ assertEquals(toJson(records), toJson(readRecords));
+
+ // Step 2: Assert Parquet metadata was written appropriately
+ List<String> recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList());
+
+ String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get();
+ String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get();
+
+ FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData();
+
+ Map<String, String> extraMetadata = parquetMetadata.getKeyValueMetaData();
+
+ assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey);
+ assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey);
+ assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name());
+
+ // Step 3: Make sure Bloom Filter contains all the record keys
+ BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath);
+ recordKeys.forEach(recordKey -> {
+ assertTrue(bloomFilter.mightContain(recordKey));
+ });
+ }
+
+ private static List<String> toJson(List<GenericRecord> records) {
+ return records.stream().map(r -> {
+ try {
+ return new String(HoodieAvroUtils.avroToJson(r, true));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
index 817fc25a5d..373fc31a56 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io.storage;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
@@ -37,8 +38,6 @@ import java.io.IOException;
import java.util.function.Supplier;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -78,8 +77,8 @@ public class TestHoodieOrcReaderWriter extends TestHoodieReaderWriterBase {
protected void verifyMetadata(Configuration conf) throws IOException {
Reader orcReader = OrcFile.createReader(getFilePath(), OrcFile.readerOptions(conf));
assertEquals(4, orcReader.getMetadataKeys().size());
- assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER));
- assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER));
+ assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER));
+ assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));
assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY));
assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index 035cb2eab9..b939498c3e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -18,20 +18,18 @@
package org.apache.hudi.io.storage.row;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.util.Option;
import org.apache.parquet.hadoop.api.WriteSupport;
-import java.util.HashMap;
-
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
/**
* Hoodie Write Support for directly writing {@link RowData} to Parquet.
@@ -39,14 +37,13 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_
public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport {
private final Configuration hadoopConf;
- private final BloomFilter bloomFilter;
- private String minRecordKey;
- private String maxRecordKey;
+ private final Option<HoodieBloomFilterWriteSupport<String>> bloomFilterWriteSupportOpt;
public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, BloomFilter bloomFilter) {
super(rowType);
this.hadoopConf = new Configuration(conf);
- this.bloomFilter = bloomFilter;
+ this.bloomFilterWriteSupportOpt = Option.ofNullable(bloomFilter)
+ .map(HoodieBloomFilterRowDataWriteSupport::new);
}
public Configuration getHadoopConf() {
@@ -55,32 +52,26 @@ public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport
@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
- HashMap<String, String> extraMetaData = new HashMap<>();
- if (bloomFilter != null) {
- extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
- if (minRecordKey != null && maxRecordKey != null) {
- extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
- extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
- }
- if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
- extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
- }
- }
- return new WriteSupport.FinalizedWriteContext(extraMetaData);
+ Map<String, String> extraMetadata =
+ bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
+ .orElse(Collections.emptyMap());
+
+ return new WriteSupport.FinalizedWriteContext(extraMetadata);
}
public void add(String recordKey) {
- this.bloomFilter.add(recordKey);
- if (minRecordKey != null) {
- minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
- } else {
- minRecordKey = recordKey;
+ this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+ bloomFilterWriteSupport.addKey(recordKey));
+ }
+
+ private static class HoodieBloomFilterRowDataWriteSupport extends HoodieBloomFilterWriteSupport<String> {
+ public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
+ super(bloomFilter);
}
- if (maxRecordKey != null) {
- maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
- } else {
- maxRecordKey = recordKey;
+ @Override
+ protected byte[] getUTF8Bytes(String key) {
+ return key.getBytes(StandardCharsets.UTF_8);
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 28964ecc3f..bb4dd9c619 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -19,8 +19,8 @@
package org.apache.hudi.io.storage.row;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.parquet.hadoop.api.WriteSupport;
@@ -28,12 +28,8 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
-import java.util.HashMap;
-
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+import java.util.Collections;
+import java.util.Map;
/**
* Hoodie Write Support for directly writing Row to Parquet.
@@ -41,19 +37,17 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_
public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
private final Configuration hadoopConf;
- private final BloomFilter bloomFilter;
-
- private UTF8String minRecordKey;
- private UTF8String maxRecordKey;
+ private final Option<HoodieBloomFilterWriteSupport<UTF8String>> bloomFilterWriteSupportOpt;
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieWriteConfig writeConfig) {
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", writeConfig.parquetFieldIdWriteEnabled());
- this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
- this.bloomFilter = bloomFilterOpt.orElse(null);
+
+ this.hadoopConf = hadoopConf;
+ this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
public Configuration getHadoopConf() {
@@ -62,32 +56,35 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
- HashMap<String, String> extraMetaData = new HashMap<>();
- if (bloomFilter != null) {
- extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
- if (minRecordKey != null && maxRecordKey != null) {
- extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
- extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
- }
- if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
- extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
- }
- }
- return new WriteSupport.FinalizedWriteContext(extraMetaData);
+ Map<String, String> extraMetadata =
+ bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
+ .orElse(Collections.emptyMap());
+
+ return new WriteSupport.FinalizedWriteContext(extraMetadata);
}
public void add(UTF8String recordKey) {
- this.bloomFilter.add(recordKey.getBytes());
+ this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+ bloomFilterWriteSupport.addKey(recordKey));
+ }
- if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) {
+ private static class HoodieBloomFilterRowWriteSupport extends HoodieBloomFilterWriteSupport<UTF8String> {
+ public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
+ super(bloomFilter);
+ }
+
+ @Override
+ protected byte[] getUTF8Bytes(UTF8String key) {
+ return key.getBytes();
+ }
+
+ @Override
+ protected UTF8String dereference(UTF8String key) {
// NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy underlying buffer in
// cases when [[UTF8String]] is pointing into a buffer storing the whole containing record,
// and simply do a pass over when it holds a (immutable) buffer holding just the string
- minRecordKey = recordKey.clone();
- }
-
- if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) {
- maxRecordKey = recordKey.clone();
+ return key.clone();
}
}
+
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
index 481cda00d6..dce0e2fad5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
@@ -18,40 +18,44 @@
package org.apache.hudi.io.storage.row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Comparator;
import java.util.List;
-import java.util.Random;
-import java.util.UUID;
+import java.util.Map;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit tests {@link HoodieInternalRowParquetWriter}.
*/
public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness {
- private static final Random RANDOM = new Random();
-
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestHoodieInternalRowParquetWriter");
@@ -68,44 +72,55 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void endToEndTest(boolean parquetWriteLegacyFormatEnabled) throws Exception {
+ public void testProperWriting(boolean parquetWriteLegacyFormatEnabled) throws Exception {
+ // Generate inputs
+ Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, 100,
+ HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, false);
+ StructType schema = inputRows.schema();
+
+ List<InternalRow> rows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
+
HoodieWriteConfig.Builder writeConfigBuilder =
SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort);
- for (int i = 0; i < 5; i++) {
- // init write support and parquet config
- HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled);
- HoodieWriteConfig cfg = writeConfigBuilder.build();
- HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport,
- CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
- writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
-
- // prepare path
- String fileId = UUID.randomUUID().toString();
- Path filePath = new Path(basePath + "/" + fileId);
- String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- metaClient.getFs().mkdirs(new Path(basePath));
-
- // init writer
- HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig);
-
- // generate input
- int size = 10 + RANDOM.nextInt(100);
- // Generate inputs
- Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, size, partitionPath, false);
- List<InternalRow> internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
-
- // issue writes
- for (InternalRow internalRow : internalRows) {
- writer.write(internalRow);
- }
- // close the writer
- writer.close();
+ HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled);
+ HoodieWriteConfig cfg = writeConfigBuilder.build();
+ HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport,
+ CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
+ writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
- // verify rows
- Dataset<Row> result = sqlContext.read().parquet(basePath);
- assertEquals(0, inputRows.except(result).count());
+ Path filePath = new Path(basePath + "/internal_row_writer.parquet");
+
+ try (HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig)) {
+ for (InternalRow row : rows) {
+ writer.writeRow(row.getUTF8String(schema.fieldIndex("record_key")), row);
+ }
}
+
+ // Step 1: Verify rows written correctly
+ Dataset<Row> result = sqlContext.read().parquet(basePath);
+ assertEquals(0, inputRows.except(result).count());
+
+ // Step 2: Assert Parquet metadata was written appropriately
+ List<String> recordKeys =
+ rows.stream().map(r -> r.getString(schema.fieldIndex("record_key"))).collect(Collectors.toList());
+
+ String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get();
+ String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get();
+
+ FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData();
+
+ Map<String, String> extraMetadata = parquetMetadata.getKeyValueMetaData();
+
+ assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey);
+ assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey);
+ assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name());
+
+ // Step 3: Make sure Bloom Filter contains all the record keys
+ BloomFilter bloomFilter = new ParquetUtils().readBloomFilterFromMetadata(hadoopConf, filePath);
+ recordKeys.forEach(recordKey -> {
+ assertTrue(bloomFilter.mightContain(recordKey));
+ });
}
private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig.Builder writeConfigBuilder, Configuration hadoopConf, boolean parquetWriteLegacyFormatEnabled) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index c3920211ae..e87364fb90 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -20,12 +20,14 @@ package org.apache.hudi.avro;
import org.apache.avro.Schema;
import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -34,55 +36,45 @@ import java.util.Map;
*/
public class HoodieAvroWriteSupport extends AvroWriteSupport {
- private Option<BloomFilter> bloomFilterOpt;
- private String minRecordKey;
- private String maxRecordKey;
- private Map<String, String> footerMetadata = new HashMap<>();
+ private final Option<HoodieBloomFilterWriteSupport<String>> bloomFilterWriteSupportOpt;
+ private final Map<String, String> footerMetadata = new HashMap<>();
public static final String OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter";
public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter";
- public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
- public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
- public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";
public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, Option<BloomFilter> bloomFilterOpt) {
super(schema, avroSchema, ConvertingGenericData.INSTANCE);
- this.bloomFilterOpt = bloomFilterOpt;
+ this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
}
@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
- if (bloomFilterOpt.isPresent()) {
- footerMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString());
- if (minRecordKey != null && maxRecordKey != null) {
- footerMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
- footerMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
- }
- if (bloomFilterOpt.get().getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
- footerMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name());
- }
- }
- return new WriteSupport.FinalizedWriteContext(footerMetadata);
+ Map<String, String> extraMetadata =
+ CollectionUtils.combine(footerMetadata,
+ bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
+ .orElse(Collections.emptyMap())
+ );
+
+ return new WriteSupport.FinalizedWriteContext(extraMetadata);
}
public void add(String recordKey) {
- if (bloomFilterOpt.isPresent()) {
- this.bloomFilterOpt.get().add(recordKey);
- if (minRecordKey != null) {
- minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
- } else {
- minRecordKey = recordKey;
- }
-
- if (maxRecordKey != null) {
- maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
- } else {
- maxRecordKey = recordKey;
- }
- }
+ this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+ bloomFilterWriteSupport.addKey(recordKey));
}
public void addFooterMetadata(String key, String value) {
footerMetadata.put(key, value);
}
+
+ private static class HoodieBloomFilterAvroWriteSupport extends HoodieBloomFilterWriteSupport<String> {
+ public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) {
+ super(bloomFilter);
+ }
+
+ @Override
+ protected byte[] getUTF8Bytes(String key) {
+ return key.getBytes(StandardCharsets.UTF_8);
+ }
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java
new file mode 100644
index 0000000000..1a689791ba
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+
+/**
+ * This is write-support utility base-class taking up handling of
+ *
+ * <ul>
+ * <li>Adding record keys to the Bloom Filter</li>
+ * <li>Keeping track of min/max record key (w/in single file)</li>
+ * </ul>
+ *
+ * @param <T> record-key type being ingested by this clas
+ */
+public abstract class HoodieBloomFilterWriteSupport<T extends Comparable<T>> {
+
+ public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
+ public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
+ public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";
+
+ private final BloomFilter bloomFilter;
+
+ private T minRecordKey;
+ private T maxRecordKey;
+
+ public HoodieBloomFilterWriteSupport(BloomFilter bloomFilter) {
+ this.bloomFilter = bloomFilter;
+ }
+
+ public void addKey(T recordKey) {
+ bloomFilter.add(getUTF8Bytes(recordKey));
+
+ if (minRecordKey == null || minRecordKey.compareTo(recordKey) > 0) {
+ minRecordKey = dereference(recordKey);
+ }
+
+ if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) < 0) {
+ maxRecordKey = dereference(recordKey);
+ }
+ }
+
+ public Map<String, String> finalizeMetadata() {
+ HashMap<String, String> extraMetadata = new HashMap<>();
+
+ extraMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
+ if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+ extraMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
+ }
+
+ if (minRecordKey != null && maxRecordKey != null) {
+ extraMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
+ extraMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
+ }
+
+ return extraMetadata;
+ }
+
+ /**
+ * Since Bloom Filter ingests record-keys represented as UTF8 encoded byte string,
+ * this method have to be implemented for converting the original record key into one
+ */
+ protected abstract byte[] getUTF8Bytes(T key);
+
+ /**
+ * This method allows to dereference the key object (t/h cloning, for ex) that might be
+ * pointing at a shared mutable buffer, to make sure that we're not keeping references
+ * to mutable objects
+ */
+ protected T dereference(T key) {
+ return key;
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index d6391d178e..badb5e37a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
@@ -83,7 +84,7 @@ public abstract class BaseFileUtils {
readFooter(configuration, false, filePath,
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
- HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
+ HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
if (null == footerVal) {
// We use old style key "com.uber.hoodie.bloomfilter"
@@ -91,9 +92,9 @@ public abstract class BaseFileUtils {
}
BloomFilter toReturn = null;
if (footerVal != null) {
- if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
+ if (footerVals.containsKey(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
toReturn = BloomFilterFactory.fromString(footerVal,
- footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
+ footerVals.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
} else {
toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
}
@@ -109,14 +110,14 @@ public abstract class BaseFileUtils {
*/
public String[] readMinMaxRecordKeys(Configuration configuration, Path filePath) {
Map<String, String> minMaxKeys = readFooter(configuration, true, filePath,
- HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
+ HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
if (minMaxKeys.size() != 2) {
throw new HoodieException(
String.format("Could not read min/max record key out of footer correctly from %s. read) : %s",
filePath, minMaxKeys));
}
- return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
- minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
+ return new String[] {minMaxKeys.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
+ minMaxKeys.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
}
/**
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java
deleted file mode 100644
index 16a77c145c..0000000000
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.avro;
-
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-public class TestHoodieAvroWriteSupport {
-
- @Test
- public void testAddKey(@TempDir java.nio.file.Path tempDir) throws IOException {
- List<String> rowKeys = new ArrayList<>();
- for (int i = 0; i < 1000; i++) {
- rowKeys.add(UUID.randomUUID().toString());
- }
- String filePath = tempDir.resolve("test.parquet").toAbsolutePath().toString();
- Schema schema = HoodieAvroUtils.getRecordKeySchema();
- BloomFilter filter = BloomFilterFactory.createBloomFilter(
- 1000, 0.0001, 10000,
- BloomFilterTypeCode.SIMPLE.name());
- HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
- new AvroSchemaConverter().convert(schema), schema, Option.of(filter));
- ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP,
- 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE);
- for (String rowKey : rowKeys) {
- GenericRecord rec = new GenericData.Record(schema);
- rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey);
- writer.write(rec);
- writeSupport.add(rowKey);
- }
- writer.close();
- }
-}