You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/06/09 03:29:38 UTC

[hudi] branch master updated: HUDI-494 fix incorrect record size estimation

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 22cd824  HUDI-494 fix incorrect record size estimation
22cd824 is described below

commit 22cd824d993bf43d88121ea89bad3a1f23a28518
Author: garyli1019 <ya...@gmail.com>
AuthorDate: Thu May 14 20:20:44 2020 -0700

    HUDI-494 fix incorrect record size estimation
---
 .../apache/hudi/config/HoodieCompactionConfig.java |  13 +++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  |  31 ------
 .../table/action/commit/UpsertPartitioner.java     |   9 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   8 +-
 .../apache/hudi/table/TestHoodieRecordSizing.java  | 116 ---------------------
 .../table/action/commit/TestUpsertPartitioner.java |  91 ++++++++++++++++
 .../hudi/testutils/HoodieTestDataGenerator.java    |   8 +-
 8 files changed, 125 insertions(+), 155 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 5e295ac..f89fc06 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -54,6 +54,12 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
   public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
   // By default, treat any file <= 100MB as a small file.
   public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(104857600);
+  // Hudi will use the previous commit to calculate the estimated record size by totalBytesWritten/totalRecordsWritten.
+  // If the previous commit is too small to make an accurate estimation, Hudi will search commits in the reverse order,
+  // until find a commit has totalBytesWritten larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * RECORD_SIZE_ESTIMATION_THRESHOLD)
+  public static final String RECORD_SIZE_ESTIMATION_THRESHOLD_PROP = "hoodie.record.size.estimation.threshold";
+  public static final String DEFAULT_RECORD_SIZE_ESTIMATION_THRESHOLD = "1.0";
+
   /**
    * Configs related to specific table types.
    */
@@ -173,6 +179,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder compactionRecordSizeEstimateThreshold(double threshold) {
+      props.setProperty(RECORD_SIZE_ESTIMATION_THRESHOLD_PROP, String.valueOf(threshold));
+      return this;
+    }
+
     public Builder insertSplitSize(int insertSplitSize) {
       props.setProperty(COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE, String.valueOf(insertSplitSize));
       return this;
@@ -254,6 +265,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
           DEFAULT_MIN_COMMITS_TO_KEEP);
       setDefaultOnCondition(props, !props.containsKey(PARQUET_SMALL_FILE_LIMIT_BYTES), PARQUET_SMALL_FILE_LIMIT_BYTES,
           DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES);
+      setDefaultOnCondition(props, !props.containsKey(RECORD_SIZE_ESTIMATION_THRESHOLD_PROP), RECORD_SIZE_ESTIMATION_THRESHOLD_PROP,
+          DEFAULT_RECORD_SIZE_ESTIMATION_THRESHOLD);
       setDefaultOnCondition(props, !props.containsKey(COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE),
           COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE, DEFAULT_COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE);
       setDefaultOnCondition(props, !props.containsKey(COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS),
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d6527fa..d899257 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -272,6 +272,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES));
   }
 
+  public double getRecordSizeEstimationThreshold() {
+    return Double.parseDouble(props.getProperty(HoodieCompactionConfig.RECORD_SIZE_ESTIMATION_THRESHOLD_PROP));
+  }
+
   public int getCopyOnWriteInsertSplitSize() {
     return Integer.parseInt(props.getProperty(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE));
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index ed29180..974d847 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -29,14 +29,12 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ParquetReaderIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
@@ -299,33 +297,4 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       return sb.toString();
     }
   }
-
-  /**
-   * Obtains the average record size based on records written during previous commits. Used for estimating how many
-   * records pack into one file.
-   */
-  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
-    long avgSize = defaultRecordSizeEstimate;
-    try {
-      if (!commitTimeline.empty()) {
-        // Go over the reverse ordered commits to get a more recent estimate of average record size.
-        Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
-        while (instants.hasNext()) {
-          HoodieInstant instant = instants.next();
-          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-              .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
-          long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
-          long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
-          if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
-            avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
-            break;
-          }
-        }
-      }
-    } catch (Throwable t) {
-      // make this fail safe.
-      LOG.error("Error trying to compute average bytes/record ", t);
-    }
-    return avgSize;
-  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 745388c..a598710 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -131,7 +131,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
     Set<String> partitionPaths = profile.getPartitionPaths();
     long averageRecordSize =
         averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
-            config.getCopyOnWriteRecordSizeEstimate());
+            config);
     LOG.info("AvgRecordSize => " + averageRecordSize);
 
     Map<String, List<SmallFile>> partitionSmallFilesMap =
@@ -289,8 +289,9 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
    * Obtains the average record size based on records written during previous commits. Used for estimating how many
    * records pack into one file.
    */
-  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
-    long avgSize = defaultRecordSizeEstimate;
+  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
+    long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+    long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
     try {
       if (!commitTimeline.empty()) {
         // Go over the reverse ordered commits to get a more recent estimate of average record size.
@@ -301,7 +302,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
               .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
           long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
           long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
-          if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
+          if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
             avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
             break;
           }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 8c2efed..8107cdf 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1041,10 +1041,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA);
     return builder
         .withCompactionConfig(
-            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15)
-                .insertSplitSize(insertSplitSize).build()) // tolerate upto 15 records
+            HoodieCompactionConfig.newBuilder()
+                .compactionSmallFileSize(dataGen.getEstimatedFileSizeInBytes(150))
+                .insertSplitSize(insertSplitSize).build())
         .withStorageConfig(
-            HoodieStorageConfig.newBuilder().limitFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 20).build())
+            HoodieStorageConfig.newBuilder()
+                .limitFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
         .build();
   }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java
deleted file mode 100644
index 17b0050..0000000
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table;
-
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
-import static org.apache.hudi.table.HoodieCopyOnWriteTable.averageBytesPerRecord;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestHoodieRecordSizing {
-
-  private static List<HoodieInstant> setupHoodieInstants() {
-    List<HoodieInstant> instants = new ArrayList<>();
-    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1"));
-    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts2"));
-    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts3"));
-    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts4"));
-    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts5"));
-    Collections.reverse(instants);
-    return instants;
-  }
-
-  private static List<HoodieWriteStat> generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) {
-    List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStat(5);
-    // clear all record and byte stats except for last entry.
-    for (int i = 0; i < writeStatsList.size() - 1; i++) {
-      HoodieWriteStat writeStat = writeStatsList.get(i);
-      writeStat.setNumWrites(0);
-      writeStat.setTotalWriteBytes(0);
-    }
-    HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() - 1);
-    lastWriteStat.setTotalWriteBytes(totalBytesWritten);
-    lastWriteStat.setNumWrites(totalRecordsWritten);
-    return writeStatsList;
-  }
-
-  private static HoodieCommitMetadata generateCommitMetadataWith(int totalRecordsWritten, int totalBytesWritten) {
-    List<HoodieWriteStat> fakeHoodieWriteStats = generateCommitStatWith(totalRecordsWritten, totalBytesWritten);
-    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
-    fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
-    return commitMetadata;
-  }
-
-  /*
-   * This needs to be a stack so we test all cases when either/both recordsWritten ,bytesWritten is zero before a non
-   * zero averageRecordSize can be computed.
-   */
-  private static LinkedList<Option<byte[]>> generateCommitMetadataList() throws IOException {
-    LinkedList<Option<byte[]>> commits = new LinkedList<>();
-    // First commit with non zero records and bytes
-    commits.push(Option.of(generateCommitMetadataWith(2000, 10000).toJsonString().getBytes(StandardCharsets.UTF_8)));
-    // Second commit with non zero records and bytes
-    commits.push(Option.of(generateCommitMetadataWith(1500, 7500).toJsonString().getBytes(StandardCharsets.UTF_8)));
-    // Third commit with both zero records and zero bytes
-    commits.push(Option.of(generateCommitMetadataWith(0, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
-    // Fourth commit with zero records
-    commits.push(Option.of(generateCommitMetadataWith(0, 1500).toJsonString().getBytes(StandardCharsets.UTF_8)));
-    // Fifth commit with zero bytes
-    commits.push(Option.of(generateCommitMetadataWith(2500, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
-    return commits;
-  }
-
-  @Test
-  public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exception {
-    HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
-    when(commitTimeLine.empty()).thenReturn(false);
-    when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream());
-    LinkedList<Option<byte[]>> commits = generateCommitMetadataList();
-    when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop());
-    long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500);
-    long actualAvgSize = averageBytesPerRecord(commitTimeLine, 1234);
-    assertEquals(expectAvgSize, actualAvgSize);
-  }
-
-  @Test
-  public void testAverageBytesPerRecordForEmptyCommitTimeLine() {
-    HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
-    when(commitTimeLine.empty()).thenReturn(true);
-    long expectAvgSize = 2345;
-    long actualAvgSize = averageBytesPerRecord(commitTimeLine, 2345);
-    assertEquals(expectAvgSize, actualAvgSize);
-  }
-}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 0926a37..91b3bb0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -18,9 +18,13 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -37,12 +41,21 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 import scala.Tuple2;
 
+import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
+import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestUpsertPartitioner extends HoodieClientTestBase {
 
@@ -79,6 +92,84 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
     return partitioner;
   }
 
+  private static List<HoodieInstant> setupHoodieInstants() {
+    List<HoodieInstant> instants = new ArrayList<>();
+    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1"));
+    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts2"));
+    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts3"));
+    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts4"));
+    instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts5"));
+    Collections.reverse(instants);
+    return instants;
+  }
+
+  private static List<HoodieWriteStat> generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) {
+    List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStat(5);
+    // clear all record and byte stats except for last entry.
+    for (int i = 0; i < writeStatsList.size() - 1; i++) {
+      HoodieWriteStat writeStat = writeStatsList.get(i);
+      writeStat.setNumWrites(0);
+      writeStat.setTotalWriteBytes(0);
+    }
+    HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() - 1);
+    lastWriteStat.setTotalWriteBytes(totalBytesWritten);
+    lastWriteStat.setNumWrites(totalRecordsWritten);
+    return writeStatsList;
+  }
+
+  private static HoodieCommitMetadata generateCommitMetadataWith(int totalRecordsWritten, int totalBytesWritten) {
+    List<HoodieWriteStat> fakeHoodieWriteStats = generateCommitStatWith(totalRecordsWritten, totalBytesWritten);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
+    return commitMetadata;
+  }
+
+  /*
+   * This needs to be a stack so we test all cases when either/both recordsWritten ,bytesWritten is zero before a non
+   * zero averageRecordSize can be computed.
+   */
+  private static LinkedList<Option<byte[]>> generateCommitMetadataList() throws IOException {
+    LinkedList<Option<byte[]>> commits = new LinkedList<>();
+    // First commit with non zero records and bytes
+    commits.push(Option.of(generateCommitMetadataWith(2000, 10000).toJsonString().getBytes(StandardCharsets.UTF_8)));
+    // Second commit with non zero records and bytes
+    commits.push(Option.of(generateCommitMetadataWith(1500, 7500).toJsonString().getBytes(StandardCharsets.UTF_8)));
+    // Third commit with a small file
+    commits.push(Option.of(generateCommitMetadataWith(100, 500).toJsonString().getBytes(StandardCharsets.UTF_8)));
+    // Fourth commit with both zero records and zero bytes
+    commits.push(Option.of(generateCommitMetadataWith(0, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
+    // Fifth commit with zero records
+    commits.push(Option.of(generateCommitMetadataWith(0, 1500).toJsonString().getBytes(StandardCharsets.UTF_8)));
+    // Sixth commit with zero bytes
+    commits.push(Option.of(generateCommitMetadataWith(2500, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
+    return commits;
+  }
+
+  @Test
+  public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exception {
+    HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1000).build())
+        .build();
+    when(commitTimeLine.empty()).thenReturn(false);
+    when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream());
+    LinkedList<Option<byte[]>> commits = generateCommitMetadataList();
+    when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop());
+    long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500);
+    long actualAvgSize = averageBytesPerRecord(commitTimeLine, config);
+    assertEquals(expectAvgSize, actualAvgSize);
+  }
+
+  @Test
+  public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception {
+    HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder().build();
+    when(commitTimeLine.empty()).thenReturn(true);
+    long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
+    long actualAvgSize = averageBytesPerRecord(commitTimeLine, config);
+    assertEquals(expectAvgSize, actualAvgSize);
+  }
+
   @Test
   public void testUpsertPartitioner() throws Exception {
     final String testPartitionPath = "2016/09/26";
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
index 9f7ed23..a6de0f5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
@@ -70,7 +70,9 @@ import java.util.stream.Stream;
 public class HoodieTestDataGenerator {
 
   // based on examination of sample file, the schema produces the following per record size
-  public static final int SIZE_PER_RECORD = 50 * 1024;
+  public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
+  // with default bloom filter with 60,000 entries and 0.000000001 FPRate
+  public static final int BLOOM_FILTER_BYTES = 323495;
   private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class);
   public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
   public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
@@ -144,6 +146,10 @@ public class HoodieTestDataGenerator {
     }
   }
 
+  public int getEstimatedFileSizeInBytes(int numOfRecords) {
+    return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES;
+  }
+
   public TestRawTripPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
     if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
       return generateRandomValue(key, commitTime, isFlattened);