You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/04/24 01:53:23 UTC

[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #1042: PARQUET-2254 Support building dynamic bloom filter that adapts to the data

wgtmac commented on code in PR #1042:
URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1174492807


##########
parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java:
##########
@@ -503,6 +523,30 @@ public Builder withBloomFilterEnabled(boolean enabled) {
       return this;
     }
 
+    /**
+     * Whether to use dynamic bloom filter to automatically adjust the bloom filter size according to
+     * `parquet.bloom.filter.max.bytes`.
+     * If NDV (number of distinct values) for a specified column is set, it will be ignored
+     *
+     * @param enabled whether to use dynamic bloom filter
+     */
+    public Builder withDynamicBloomFilterEnabled(boolean enabled) {
+      this.dynamicBloomFilterEnabled.withDefaultValue(enabled);
+      return this;
+    }
+
+    /**
+     * When `DynamicBloomFilter` is enabled, set how many bloomFilters to split as candidates.

Review Comment:
   ```suggestion
        * When `DynamicBloomFilter` is enabled, set how many bloom filter candidates to use.
   ```



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java:
##########
@@ -19,6 +19,11 @@
 package org.apache.parquet.hadoop;
 
 import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;

Review Comment:
   Why is this file changed?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {

Review Comment:
   ```suggestion
     public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int numCandidates, double fpp, ColumnDescriptor column) {
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;

Review Comment:
   Should we put the magic number as a static variable and define it on top of the class? And why 500 instead of a power of two?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {

Review Comment:
   It could be either `DynamicBloomFilter` or `DynamicBlockSplitBloomFilter`, but not in between. Please also fix the comment above.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.dynamic.enabled";

Review Comment:
   ```suggestion
     public static final String DYNAMIC_BLOOM_FILTER_ENABLED = "parquet.dynamic.bloom.filter.enabled";
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java:
##########
@@ -97,7 +97,13 @@ abstract class ColumnWriterBase implements ColumnWriter {
       int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
       this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize);
     } else {
-      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, maxBloomFilterSize);
+      boolean useDynamicBloomFilter = props.getDynamicBloomFilterEnabled(path);
+      if(useDynamicBloomFilter) {

Review Comment:
   ```suggestion
         if (props.getDynamicBloomFilterEnabled(path)) {
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java:
##########
@@ -619,6 +619,28 @@ public SELF withBloomFilterFPP(String columnPath, double fpp) {
       return self();
     }
 
+    /**
+     * When NDV (number of distinct values) for a specified column is not set, whether to use
+     * `DynamicBloomFilter` to automatically adjust the BloomFilter size according to `parquet.bloom.filter.max.bytes`
+     *
+     * @param enabled whether to write bloom filter for the column
+     */
+    public SELF withDynamicBloomFilterEnabled(boolean enabled) {
+      encodingPropsBuilder.withDynamicBloomFilterEnabled(enabled);
+      return self();
+    }
+
+    /**
+     * When `DynamicBloomFilter` is enabled, set how many bloomFilters to split as candidates.

Review Comment:
   ```suggestion
        * When `DynamicBloomFilter` is enabled, set how many bloom filter candidates to use.
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;

Review Comment:
   ```suggestion
     private BloomFilterCandidate largestCandidate;
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {

Review Comment:
   ```suggestion
   public class DynamicBlockSplitBloomFilter implements BloomFilter {
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {

Review Comment:
   ```suggestion
     public DynamicBlockBloomFilter(int numBytes, int numCandidates, double fpp, ColumnDescriptor column) {
   ```



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDynamicBlockBloomFiltering.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+public class TestDynamicBlockBloomFiltering extends TestBloomFiltering {
+
+  @BeforeClass
+  public static void createFiles() throws IOException {
+    createFiles(true);
+  }
+
+  public TestDynamicBlockBloomFiltering(Path file, boolean isEncrypted) {
+    super(file, isEncrypted);
+  }
+
+  @Test
+  public void testSimpleFiltering() throws IOException {
+    super.testSimpleFiltering();
+  }
+
+  @Test
+  public void testNestedFiltering() throws IOException {
+    super.testNestedFiltering();
+  }
+
+  @Test
+  public void checkBloomFilterSize() throws IOException {

Review Comment:
   Should we add a case where a bloom filter will not be created eventually?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.dynamic.enabled";
+  public static final String BLOOM_FILTER_CANDIDATE_SIZE = "parquet.bloom.filter.candidate.size";

Review Comment:
   ```suggestion
     public static final String NUM_DYNAMIC_BLOOM_FILTER_CANDIDATES = "parquet.num.dynamic.bloom.filter.candidates";
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {

Review Comment:
   The name is not precise as it may be clamped by the min/max values. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.dynamic.enabled";

Review Comment:
   Please document them in the README.md as well. It would be good to say that it may fail to create a bloom filter in the end.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;

Review Comment:
   Is `int` enough? 



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");

Review Comment:
   Should we simply skip this candidate and add a warn log here? It may not be a fatal error.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {

Review Comment:
   ```suggestion
     private void initCandidates(int maxBytes, int numCandidates, double fpp) {
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();

Review Comment:
   Is `candidates.get(0)` enough? Should we throw if candidates is empty?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);

Review Comment:
   If `candidateByteSize` is power of two already in the line 113, can be simply apply `candidateByteSize /= 2` here?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the optimal bloom filter NDV is {}, byte size is {}.",
+      columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,

Review Comment:
   Should we check `finalized` and `candidates.isEmpty()` ?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;

Review Comment:
   Check `finalized` first in case of re-entry.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the optimal bloom filter NDV is {}, byte size is {}.",
+      columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,
+      "Dynamic bloom filter insertion has been mark as finalized, no more data is allowed!");
+    if (!maxCandidate.bloomFilter.findHash(hash)) {
+      distinctValueCounter++;
+    }
+    // distinct values exceed the expected size, remove the bad bloom filter (leave at least the max bloom filter candidate)
+    candidates.removeIf(candidate -> candidate.getExpectedNDV() < distinctValueCounter && candidate != maxCandidate);
+    candidates.forEach(candidate -> candidate.getBloomFilter().insertHash(hash));
+  }
+
+  @Override
+  public int getBitsetSize() {
+    return optimalCandidate().getBloomFilter().getBitsetSize();
+  }
+
+  @Override
+  public boolean findHash(long hash) {
+    return maxCandidate.bloomFilter.findHash(hash);

Review Comment:
   Wrap `maxCandidate` in a function and throw if none of candidates are alive. Same for functions below.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.dynamic.enabled";

Review Comment:
   BTW, I am not sure if `dynamic bloom filter` is a generally accepted name to summarize this feature.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {

Review Comment:
   We can simply rename it as `calculateBoundedPowerOfTwo`?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the optimal bloom filter NDV is {}, byte size is {}.",
+      columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,
+      "Dynamic bloom filter insertion has been mark as finalized, no more data is allowed!");
+    if (!maxCandidate.bloomFilter.findHash(hash)) {
+      distinctValueCounter++;
+    }
+    // distinct values exceed the expected size, remove the bad bloom filter (leave at least the max bloom filter candidate)
+    candidates.removeIf(candidate -> candidate.getExpectedNDV() < distinctValueCounter && candidate != maxCandidate);
+    candidates.forEach(candidate -> candidate.getBloomFilter().insertHash(hash));
+  }
+
+  @Override
+  public int getBitsetSize() {
+    return optimalCandidate().getBloomFilter().getBitsetSize();

Review Comment:
   What if optimal does not exist? Will it throw in the `optimalCandidate()`?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) {

Review Comment:
   I know these constructors are following the same style of `BlockSplitBloomFilter`, but I really doubt if this one is necessary at all. Specifically, how do we know `numBytes` in advance?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org