You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/10/07 10:37:35 UTC

[hudi] branch master updated: [HUDI-4992] Fixing invalid min/max record key stats in Parquet metadata (#6883)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a51181726c [HUDI-4992] Fixing invalid min/max record key stats in Parquet metadata (#6883)
a51181726c is described below

commit a51181726ce6efb57459285a66868e9d3687bd60
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Fri Oct 7 03:37:26 2022 -0700

    [HUDI-4992] Fixing invalid min/max record key stats in Parquet metadata (#6883)
---
 .../apache/hudi/io/storage/HoodieOrcWriter.java    |  10 +-
 .../hudi/avro/TestHoodieAvroParquetWriter.java     | 118 +++++++++++++++++++++
 .../hudi/io/storage/TestHoodieOrcReaderWriter.java |   7 +-
 .../row/HoodieRowDataParquetWriteSupport.java      |  55 ++++------
 .../storage/row/HoodieRowParquetWriteSupport.java  |  61 +++++------
 .../row/TestHoodieInternalRowParquetWriter.java    |  95 ++++++++++-------
 .../apache/hudi/avro/HoodieAvroWriteSupport.java   |  60 +++++------
 .../hudi/avro/HoodieBloomFilterWriteSupport.java   |  96 +++++++++++++++++
 .../org/apache/hudi/common/util/BaseFileUtils.java |  13 +--
 .../hudi/avro/TestHoodieAvroWriteSupport.java      |  67 ------------
 10 files changed, 361 insertions(+), 221 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
index a532ac66c9..4bcab2cec8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
@@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
 import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -44,9 +45,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
 
 public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
     implements HoodieFileWriter<R>, Closeable {
@@ -155,11 +153,11 @@ public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRec
       final BloomFilter bloomFilter = orcConfig.getBloomFilter();
       writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()));
       if (minRecordKey != null && maxRecordKey != null) {
-        writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
-        writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
+        writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
+        writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
       }
       if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
-        writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
+        writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
       }
     }
     writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes()));
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
new file mode 100644
index 0000000000..df879dc816
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.DummyTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieAvroParquetWriter {
+
+  @TempDir java.nio.file.Path tmpDir;
+
+  @Test
+  public void testProperWriting() throws IOException {
+    Configuration hadoopConf = new Configuration();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
+    List<GenericRecord> records = dataGen.generateGenericRecords(10);
+
+    Schema schema = records.get(0).getSchema();
+
+    BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000,
+        BloomFilterTypeCode.DYNAMIC_V0.name());
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema),
+        schema, Option.of(filter));
+
+    HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
+        new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE,
+            ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1);
+
+    Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString());
+
+    try (HoodieAvroParquetWriter<GenericRecord> writer =
+        new HoodieAvroParquetWriter<>(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) {
+      for (GenericRecord record : records) {
+        writer.writeAvro((String) record.get("_row_key"), record);
+      }
+    }
+
+    ParquetUtils utils = new ParquetUtils();
+
+    // Step 1: Make sure records are written appropriately
+    List<GenericRecord> readRecords = utils.readAvroRecords(hadoopConf, filePath);
+
+    assertEquals(toJson(records), toJson(readRecords));
+
+    // Step 2: Assert Parquet metadata was written appropriately
+    List<String> recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList());
+
+    String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get();
+    String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get();
+
+    FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData();
+
+    Map<String, String> extraMetadata = parquetMetadata.getKeyValueMetaData();
+
+    assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey);
+    assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey);
+    assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name());
+
+    // Step 3: Make sure Bloom Filter contains all the record keys
+    BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath);
+    recordKeys.forEach(recordKey -> {
+      assertTrue(bloomFilter.mightContain(recordKey));
+    });
+  }
+
+  private static List<String> toJson(List<GenericRecord> records) {
+    return records.stream().map(r -> {
+      try {
+        return new String(HoodieAvroUtils.avroToJson(r, true));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
index 817fc25a5d..373fc31a56 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
@@ -37,8 +38,6 @@ import java.io.IOException;
 import java.util.function.Supplier;
 
 import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
 import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -78,8 +77,8 @@ public class TestHoodieOrcReaderWriter extends TestHoodieReaderWriterBase {
   protected void verifyMetadata(Configuration conf) throws IOException {
     Reader orcReader = OrcFile.createReader(getFilePath(), OrcFile.readerOptions(conf));
     assertEquals(4, orcReader.getMetadataKeys().size());
-    assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER));
-    assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER));
+    assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER));
+    assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER));
     assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));
     assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY));
     assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index 035cb2eab9..b939498c3e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -18,20 +18,18 @@
 
 package org.apache.hudi.io.storage.row;
 
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.util.Option;
 import org.apache.parquet.hadoop.api.WriteSupport;
 
-import java.util.HashMap;
-
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * Hoodie Write Support for directly writing {@link RowData} to Parquet.
@@ -39,14 +37,13 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_
 public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport {
 
   private final Configuration hadoopConf;
-  private final BloomFilter bloomFilter;
-  private String minRecordKey;
-  private String maxRecordKey;
+  private final Option<HoodieBloomFilterWriteSupport<String>> bloomFilterWriteSupportOpt;
 
   public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, BloomFilter bloomFilter) {
     super(rowType);
     this.hadoopConf = new Configuration(conf);
-    this.bloomFilter = bloomFilter;
+    this.bloomFilterWriteSupportOpt = Option.ofNullable(bloomFilter)
+        .map(HoodieBloomFilterRowDataWriteSupport::new);
   }
 
   public Configuration getHadoopConf() {
@@ -55,32 +52,26 @@ public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport
 
   @Override
   public WriteSupport.FinalizedWriteContext finalizeWrite() {
-    HashMap<String, String> extraMetaData = new HashMap<>();
-    if (bloomFilter != null) {
-      extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
-      if (minRecordKey != null && maxRecordKey != null) {
-        extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
-        extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
-      }
-      if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
-        extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
-      }
-    }
-    return new WriteSupport.FinalizedWriteContext(extraMetaData);
+    Map<String, String> extraMetadata =
+        bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
+            .orElse(Collections.emptyMap());
+
+    return new WriteSupport.FinalizedWriteContext(extraMetadata);
   }
 
   public void add(String recordKey) {
-    this.bloomFilter.add(recordKey);
-    if (minRecordKey != null) {
-      minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
-    } else {
-      minRecordKey = recordKey;
+    this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+        bloomFilterWriteSupport.addKey(recordKey));
+  }
+
+  private static class HoodieBloomFilterRowDataWriteSupport extends HoodieBloomFilterWriteSupport<String> {
+    public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
+      super(bloomFilter);
     }
 
-    if (maxRecordKey != null) {
-      maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
-    } else {
-      maxRecordKey = recordKey;
+    @Override
+    protected byte[] getUTF8Bytes(String key) {
+      return key.getBytes(StandardCharsets.UTF_8);
     }
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 28964ecc3f..bb4dd9c619 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -19,8 +19,8 @@
 package org.apache.hudi.io.storage.row;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -28,12 +28,8 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
-import java.util.HashMap;
-
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
-import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * Hoodie Write Support for directly writing Row to Parquet.
@@ -41,19 +37,17 @@ import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_
 public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
 
   private final Configuration hadoopConf;
-  private final BloomFilter bloomFilter;
-
-  private UTF8String minRecordKey;
-  private UTF8String maxRecordKey;
+  private final Option<HoodieBloomFilterWriteSupport<UTF8String>> bloomFilterWriteSupportOpt;
 
   public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieWriteConfig writeConfig) {
     Configuration hadoopConf = new Configuration(conf);
     hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
     hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
     hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", writeConfig.parquetFieldIdWriteEnabled());
-    this.hadoopConf = hadoopConf;
     setSchema(structType, hadoopConf);
-    this.bloomFilter = bloomFilterOpt.orElse(null);
+
+    this.hadoopConf = hadoopConf;
+    this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
   public Configuration getHadoopConf() {
@@ -62,32 +56,35 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
 
   @Override
   public WriteSupport.FinalizedWriteContext finalizeWrite() {
-    HashMap<String, String> extraMetaData = new HashMap<>();
-    if (bloomFilter != null) {
-      extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
-      if (minRecordKey != null && maxRecordKey != null) {
-        extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
-        extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
-      }
-      if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
-        extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
-      }
-    }
-    return new WriteSupport.FinalizedWriteContext(extraMetaData);
+    Map<String, String> extraMetadata =
+        bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
+            .orElse(Collections.emptyMap());
+
+    return new WriteSupport.FinalizedWriteContext(extraMetadata);
   }
 
   public void add(UTF8String recordKey) {
-    this.bloomFilter.add(recordKey.getBytes());
+    this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+        bloomFilterWriteSupport.addKey(recordKey));
+  }
 
-    if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) {
+  private static class HoodieBloomFilterRowWriteSupport extends HoodieBloomFilterWriteSupport<UTF8String> {
+    public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
+      super(bloomFilter);
+    }
+
+    @Override
+    protected byte[] getUTF8Bytes(UTF8String key) {
+      return key.getBytes();
+    }
+
+    @Override
+    protected UTF8String dereference(UTF8String key) {
       // NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy underlying buffer in
       //       cases when [[UTF8String]] is pointing into a buffer storing the whole containing record,
       //       and simply do a pass over when it holds a (immutable) buffer holding just the string
-      minRecordKey =  recordKey.clone();
-    }
-
-    if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) {
-      maxRecordKey = recordKey.clone();
+      return key.clone();
     }
   }
+
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
index 481cda00d6..dce0e2fad5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
@@ -18,40 +18,44 @@
 
 package org.apache.hudi.io.storage.row;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.SparkDatasetTestUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Comparator;
 import java.util.List;
-import java.util.Random;
-import java.util.UUID;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Unit tests {@link HoodieInternalRowParquetWriter}.
  */
 public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness {
 
-  private static final Random RANDOM = new Random();
-
   @BeforeEach
   public void setUp() throws Exception {
     initSparkContexts("TestHoodieInternalRowParquetWriter");
@@ -68,44 +72,55 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
-  public void endToEndTest(boolean parquetWriteLegacyFormatEnabled) throws Exception {
+  public void testProperWriting(boolean parquetWriteLegacyFormatEnabled) throws Exception {
+    // Generate inputs
+    Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, 100,
+        HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, false);
+    StructType schema = inputRows.schema();
+
+    List<InternalRow> rows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
+
     HoodieWriteConfig.Builder writeConfigBuilder =
         SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort);
-    for (int i = 0; i < 5; i++) {
-      // init write support and parquet config
-      HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled);
-      HoodieWriteConfig cfg = writeConfigBuilder.build();
-      HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport,
-          CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
-          writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
-
-      // prepare path
-      String fileId = UUID.randomUUID().toString();
-      Path filePath = new Path(basePath + "/" + fileId);
-      String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-      metaClient.getFs().mkdirs(new Path(basePath));
-
-      // init writer
-      HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig);
-
-      // generate input
-      int size = 10 + RANDOM.nextInt(100);
-      // Generate inputs
-      Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, size, partitionPath, false);
-      List<InternalRow> internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
-
-      // issue writes
-      for (InternalRow internalRow : internalRows) {
-        writer.write(internalRow);
-      }
 
-      // close the writer
-      writer.close();
+    HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled);
+    HoodieWriteConfig cfg = writeConfigBuilder.build();
+    HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport,
+        CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
+        writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
 
-      // verify rows
-      Dataset<Row> result = sqlContext.read().parquet(basePath);
-      assertEquals(0, inputRows.except(result).count());
+    Path filePath = new Path(basePath + "/internal_row_writer.parquet");
+
+    try (HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig)) {
+      for (InternalRow row : rows) {
+        writer.writeRow(row.getUTF8String(schema.fieldIndex("record_key")), row);
+      }
     }
+
+    // Step 1: Verify rows written correctly
+    Dataset<Row> result = sqlContext.read().parquet(basePath);
+    assertEquals(0, inputRows.except(result).count());
+
+    // Step 2: Assert Parquet metadata was written appropriately
+    List<String> recordKeys =
+        rows.stream().map(r -> r.getString(schema.fieldIndex("record_key"))).collect(Collectors.toList());
+
+    String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get();
+    String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get();
+
+    FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData();
+
+    Map<String, String> extraMetadata = parquetMetadata.getKeyValueMetaData();
+
+    assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey);
+    assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey);
+    assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name());
+
+    // Step 3: Make sure Bloom Filter contains all the record keys
+    BloomFilter bloomFilter = new ParquetUtils().readBloomFilterFromMetadata(hadoopConf, filePath);
+    recordKeys.forEach(recordKey -> {
+      assertTrue(bloomFilter.mightContain(recordKey));
+    });
   }
 
   private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig.Builder writeConfigBuilder, Configuration hadoopConf, boolean parquetWriteLegacyFormatEnabled) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index c3920211ae..e87364fb90 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -20,12 +20,14 @@ package org.apache.hudi.avro;
 
 import org.apache.avro.Schema;
 import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,55 +36,45 @@ import java.util.Map;
  */
 public class HoodieAvroWriteSupport extends AvroWriteSupport {
 
-  private Option<BloomFilter> bloomFilterOpt;
-  private String minRecordKey;
-  private String maxRecordKey;
-  private Map<String, String> footerMetadata = new HashMap<>();
+  private final Option<HoodieBloomFilterWriteSupport<String>> bloomFilterWriteSupportOpt;
+  private final Map<String, String> footerMetadata = new HashMap<>();
 
   public static final String OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter";
   public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter";
-  public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
-  public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
-  public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";
 
   public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, Option<BloomFilter> bloomFilterOpt) {
     super(schema, avroSchema, ConvertingGenericData.INSTANCE);
-    this.bloomFilterOpt = bloomFilterOpt;
+    this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
   }
 
   @Override
   public WriteSupport.FinalizedWriteContext finalizeWrite() {
-    if (bloomFilterOpt.isPresent()) {
-      footerMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString());
-      if (minRecordKey != null && maxRecordKey != null) {
-        footerMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
-        footerMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
-      }
-      if (bloomFilterOpt.get().getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
-        footerMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name());
-      }
-    }
-    return new WriteSupport.FinalizedWriteContext(footerMetadata);
+    Map<String, String> extraMetadata =
+        CollectionUtils.combine(footerMetadata,
+            bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
+                .orElse(Collections.emptyMap())
+        );
+
+    return new WriteSupport.FinalizedWriteContext(extraMetadata);
   }
 
   public void add(String recordKey) {
-    if (bloomFilterOpt.isPresent()) {
-      this.bloomFilterOpt.get().add(recordKey);
-      if (minRecordKey != null) {
-        minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
-      } else {
-        minRecordKey = recordKey;
-      }
-
-      if (maxRecordKey != null) {
-        maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
-      } else {
-        maxRecordKey = recordKey;
-      }
-    }
+    this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
+        bloomFilterWriteSupport.addKey(recordKey));
   }
 
   public void addFooterMetadata(String key, String value) {
     footerMetadata.put(key, value);
   }
+
+  private static class HoodieBloomFilterAvroWriteSupport extends HoodieBloomFilterWriteSupport<String> {
+    public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) {
+      super(bloomFilter);
+    }
+
+    @Override
+    protected byte[] getUTF8Bytes(String key) {
+      return key.getBytes(StandardCharsets.UTF_8);
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java
new file mode 100644
index 0000000000..1a689791ba
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+
+/**
+ * This is write-support utility base-class taking up handling of
+ *
+ * <ul>
+ *   <li>Adding record keys to the Bloom Filter</li>
+ *   <li>Keeping track of min/max record key (w/in single file)</li>
+ * </ul>
+ *
+ * @param <T> record-key type being ingested by this clas
+ */
+public abstract class HoodieBloomFilterWriteSupport<T extends Comparable<T>> {
+
+  public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
+  public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
+  public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";
+
+  private final BloomFilter bloomFilter;
+
+  private T minRecordKey;
+  private T maxRecordKey;
+
+  public HoodieBloomFilterWriteSupport(BloomFilter bloomFilter) {
+    this.bloomFilter = bloomFilter;
+  }
+
+  public void addKey(T recordKey) {
+    bloomFilter.add(getUTF8Bytes(recordKey));
+
+    if (minRecordKey == null || minRecordKey.compareTo(recordKey) > 0) {
+      minRecordKey = dereference(recordKey);
+    }
+
+    if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) < 0) {
+      maxRecordKey = dereference(recordKey);
+    }
+  }
+
+  public Map<String, String> finalizeMetadata() {
+    HashMap<String, String> extraMetadata = new HashMap<>();
+
+    extraMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
+    if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+      extraMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
+    }
+
+    if (minRecordKey != null && maxRecordKey != null) {
+      extraMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
+      extraMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
+    }
+
+    return extraMetadata;
+  }
+
+  /**
+   * Since Bloom Filter ingests record-keys represented as UTF8 encoded byte string,
+   * this method have to be implemented for converting the original record key into one
+   */
+  protected abstract byte[] getUTF8Bytes(T key);
+
+  /**
+   * This method allows to dereference the key object (t/h cloning, for ex) that might be
+   * pointing at a shared mutable buffer, to make sure that we're not keeping references
+   * to mutable objects
+   */
+  protected T dereference(T key) {
+    return key;
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index d6391d178e..badb5e37a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.util;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
@@ -83,7 +84,7 @@ public abstract class BaseFileUtils {
         readFooter(configuration, false, filePath,
             HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
             HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
-            HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
+            HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
     String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
     if (null == footerVal) {
       // We use old style key "com.uber.hoodie.bloomfilter"
@@ -91,9 +92,9 @@ public abstract class BaseFileUtils {
     }
     BloomFilter toReturn = null;
     if (footerVal != null) {
-      if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
+      if (footerVals.containsKey(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
         toReturn = BloomFilterFactory.fromString(footerVal,
-            footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
+            footerVals.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
       } else {
         toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
       }
@@ -109,14 +110,14 @@ public abstract class BaseFileUtils {
    */
   public String[] readMinMaxRecordKeys(Configuration configuration, Path filePath) {
     Map<String, String> minMaxKeys = readFooter(configuration, true, filePath,
-        HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
+        HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
     if (minMaxKeys.size() != 2) {
       throw new HoodieException(
           String.format("Could not read min/max record key out of footer correctly from %s. read) : %s",
               filePath, minMaxKeys));
     }
-    return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
-        minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
+    return new String[] {minMaxKeys.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
+        minMaxKeys.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
   }
 
   /**
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java
deleted file mode 100644
index 16a77c145c..0000000000
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.avro;
-
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-public class TestHoodieAvroWriteSupport {
-
-  @Test
-  public void testAddKey(@TempDir java.nio.file.Path tempDir) throws IOException {
-    List<String> rowKeys = new ArrayList<>();
-    for (int i = 0; i < 1000; i++) {
-      rowKeys.add(UUID.randomUUID().toString());
-    }
-    String filePath = tempDir.resolve("test.parquet").toAbsolutePath().toString();
-    Schema schema = HoodieAvroUtils.getRecordKeySchema();
-    BloomFilter filter = BloomFilterFactory.createBloomFilter(
-        1000, 0.0001, 10000,
-        BloomFilterTypeCode.SIMPLE.name());
-    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
-        new AvroSchemaConverter().convert(schema), schema, Option.of(filter));
-    ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP,
-        120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE);
-    for (String rowKey : rowKeys) {
-      GenericRecord rec = new GenericData.Record(schema);
-      rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey);
-      writer.write(rec);
-      writeSupport.add(rowKey);
-    }
-    writer.close();
-  }
-}