You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/06/09 03:29:38 UTC
[hudi] branch master updated: HUDI-494 fix incorrect record size
estimation
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 22cd824 HUDI-494 fix incorrect record size estimation
22cd824 is described below
commit 22cd824d993bf43d88121ea89bad3a1f23a28518
Author: garyli1019 <ya...@gmail.com>
AuthorDate: Thu May 14 20:20:44 2020 -0700
HUDI-494 fix incorrect record size estimation
---
.../apache/hudi/config/HoodieCompactionConfig.java | 13 +++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../apache/hudi/table/HoodieCopyOnWriteTable.java | 31 ------
.../table/action/commit/UpsertPartitioner.java | 9 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 8 +-
.../apache/hudi/table/TestHoodieRecordSizing.java | 116 ---------------------
.../table/action/commit/TestUpsertPartitioner.java | 91 ++++++++++++++++
.../hudi/testutils/HoodieTestDataGenerator.java | 8 +-
8 files changed, 125 insertions(+), 155 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 5e295ac..f89fc06 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -54,6 +54,12 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
// By default, treat any file <= 100MB as a small file.
public static final String DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES = String.valueOf(104857600);
+ // Hudi will use the previous commit to calculate the estimated record size by totalBytesWritten/totalRecordsWritten.
+ // If the previous commit is too small to make an accurate estimation, Hudi will search commits in the reverse order,
+ // until find a commit has totalBytesWritten larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * RECORD_SIZE_ESTIMATION_THRESHOLD)
+ public static final String RECORD_SIZE_ESTIMATION_THRESHOLD_PROP = "hoodie.record.size.estimation.threshold";
+ public static final String DEFAULT_RECORD_SIZE_ESTIMATION_THRESHOLD = "1.0";
+
/**
* Configs related to specific table types.
*/
@@ -173,6 +179,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder compactionRecordSizeEstimateThreshold(double threshold) {
+ props.setProperty(RECORD_SIZE_ESTIMATION_THRESHOLD_PROP, String.valueOf(threshold));
+ return this;
+ }
+
public Builder insertSplitSize(int insertSplitSize) {
props.setProperty(COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE, String.valueOf(insertSplitSize));
return this;
@@ -254,6 +265,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
DEFAULT_MIN_COMMITS_TO_KEEP);
setDefaultOnCondition(props, !props.containsKey(PARQUET_SMALL_FILE_LIMIT_BYTES), PARQUET_SMALL_FILE_LIMIT_BYTES,
DEFAULT_PARQUET_SMALL_FILE_LIMIT_BYTES);
+ setDefaultOnCondition(props, !props.containsKey(RECORD_SIZE_ESTIMATION_THRESHOLD_PROP), RECORD_SIZE_ESTIMATION_THRESHOLD_PROP,
+ DEFAULT_RECORD_SIZE_ESTIMATION_THRESHOLD);
setDefaultOnCondition(props, !props.containsKey(COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE),
COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE, DEFAULT_COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE);
setDefaultOnCondition(props, !props.containsKey(COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS),
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d6527fa..d899257 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -272,6 +272,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES));
}
+ public double getRecordSizeEstimationThreshold() {
+ return Double.parseDouble(props.getProperty(HoodieCompactionConfig.RECORD_SIZE_ESTIMATION_THRESHOLD_PROP));
+ }
+
public int getCopyOnWriteInsertSplitSize() {
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE));
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index ed29180..974d847 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -29,14 +29,12 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
@@ -299,33 +297,4 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return sb.toString();
}
}
-
- /**
- * Obtains the average record size based on records written during previous commits. Used for estimating how many
- * records pack into one file.
- */
- protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
- long avgSize = defaultRecordSizeEstimate;
- try {
- if (!commitTimeline.empty()) {
- // Go over the reverse ordered commits to get a more recent estimate of average record size.
- Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
- while (instants.hasNext()) {
- HoodieInstant instant = instants.next();
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
- .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
- long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
- long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
- if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
- avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
- break;
- }
- }
- }
- } catch (Throwable t) {
- // make this fail safe.
- LOG.error("Error trying to compute average bytes/record ", t);
- }
- return avgSize;
- }
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 745388c..a598710 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -131,7 +131,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
Set<String> partitionPaths = profile.getPartitionPaths();
long averageRecordSize =
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
- config.getCopyOnWriteRecordSizeEstimate());
+ config);
LOG.info("AvgRecordSize => " + averageRecordSize);
Map<String, List<SmallFile>> partitionSmallFilesMap =
@@ -289,8 +289,9 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
- protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, int defaultRecordSizeEstimate) {
- long avgSize = defaultRecordSizeEstimate;
+ protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
+ long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+ long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
try {
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
@@ -301,7 +302,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
- if (totalBytesWritten > 0 && totalRecordsWritten > 0) {
+ if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 8c2efed..8107cdf 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1041,10 +1041,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA);
return builder
.withCompactionConfig(
- HoodieCompactionConfig.newBuilder().compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15)
- .insertSplitSize(insertSplitSize).build()) // tolerate upto 15 records
+ HoodieCompactionConfig.newBuilder()
+ .compactionSmallFileSize(dataGen.getEstimatedFileSizeInBytes(150))
+ .insertSplitSize(insertSplitSize).build())
.withStorageConfig(
- HoodieStorageConfig.newBuilder().limitFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 20).build())
+ HoodieStorageConfig.newBuilder()
+ .limitFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
.build();
}
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java
deleted file mode 100644
index 17b0050..0000000
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieRecordSizing.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table;
-
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
-import static org.apache.hudi.table.HoodieCopyOnWriteTable.averageBytesPerRecord;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestHoodieRecordSizing {
-
- private static List<HoodieInstant> setupHoodieInstants() {
- List<HoodieInstant> instants = new ArrayList<>();
- instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1"));
- instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts2"));
- instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts3"));
- instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts4"));
- instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts5"));
- Collections.reverse(instants);
- return instants;
- }
-
- private static List<HoodieWriteStat> generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) {
- List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStat(5);
- // clear all record and byte stats except for last entry.
- for (int i = 0; i < writeStatsList.size() - 1; i++) {
- HoodieWriteStat writeStat = writeStatsList.get(i);
- writeStat.setNumWrites(0);
- writeStat.setTotalWriteBytes(0);
- }
- HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() - 1);
- lastWriteStat.setTotalWriteBytes(totalBytesWritten);
- lastWriteStat.setNumWrites(totalRecordsWritten);
- return writeStatsList;
- }
-
- private static HoodieCommitMetadata generateCommitMetadataWith(int totalRecordsWritten, int totalBytesWritten) {
- List<HoodieWriteStat> fakeHoodieWriteStats = generateCommitStatWith(totalRecordsWritten, totalBytesWritten);
- HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
- fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
- return commitMetadata;
- }
-
- /*
- * This needs to be a stack so we test all cases when either/both recordsWritten ,bytesWritten is zero before a non
- * zero averageRecordSize can be computed.
- */
- private static LinkedList<Option<byte[]>> generateCommitMetadataList() throws IOException {
- LinkedList<Option<byte[]>> commits = new LinkedList<>();
- // First commit with non zero records and bytes
- commits.push(Option.of(generateCommitMetadataWith(2000, 10000).toJsonString().getBytes(StandardCharsets.UTF_8)));
- // Second commit with non zero records and bytes
- commits.push(Option.of(generateCommitMetadataWith(1500, 7500).toJsonString().getBytes(StandardCharsets.UTF_8)));
- // Third commit with both zero records and zero bytes
- commits.push(Option.of(generateCommitMetadataWith(0, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
- // Fourth commit with zero records
- commits.push(Option.of(generateCommitMetadataWith(0, 1500).toJsonString().getBytes(StandardCharsets.UTF_8)));
- // Fifth commit with zero bytes
- commits.push(Option.of(generateCommitMetadataWith(2500, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
- return commits;
- }
-
- @Test
- public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exception {
- HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
- when(commitTimeLine.empty()).thenReturn(false);
- when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream());
- LinkedList<Option<byte[]>> commits = generateCommitMetadataList();
- when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop());
- long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500);
- long actualAvgSize = averageBytesPerRecord(commitTimeLine, 1234);
- assertEquals(expectAvgSize, actualAvgSize);
- }
-
- @Test
- public void testAverageBytesPerRecordForEmptyCommitTimeLine() {
- HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
- when(commitTimeLine.empty()).thenReturn(true);
- long expectAvgSize = 2345;
- long actualAvgSize = averageBytesPerRecord(commitTimeLine, 2345);
- assertEquals(expectAvgSize, actualAvgSize);
- }
-}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 0926a37..91b3bb0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -18,9 +18,13 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -37,12 +41,21 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import scala.Tuple2;
+import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
+import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestUpsertPartitioner extends HoodieClientTestBase {
@@ -79,6 +92,84 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
return partitioner;
}
+ private static List<HoodieInstant> setupHoodieInstants() {
+ List<HoodieInstant> instants = new ArrayList<>();
+ instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts1"));
+ instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts2"));
+ instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts3"));
+ instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts4"));
+ instants.add(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "ts5"));
+ Collections.reverse(instants);
+ return instants;
+ }
+
+ private static List<HoodieWriteStat> generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) {
+ List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStat(5);
+ // clear all record and byte stats except for last entry.
+ for (int i = 0; i < writeStatsList.size() - 1; i++) {
+ HoodieWriteStat writeStat = writeStatsList.get(i);
+ writeStat.setNumWrites(0);
+ writeStat.setTotalWriteBytes(0);
+ }
+ HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() - 1);
+ lastWriteStat.setTotalWriteBytes(totalBytesWritten);
+ lastWriteStat.setNumWrites(totalRecordsWritten);
+ return writeStatsList;
+ }
+
+ private static HoodieCommitMetadata generateCommitMetadataWith(int totalRecordsWritten, int totalBytesWritten) {
+ List<HoodieWriteStat> fakeHoodieWriteStats = generateCommitStatWith(totalRecordsWritten, totalBytesWritten);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
+ return commitMetadata;
+ }
+
+ /*
+ * This needs to be a stack so we test all cases when either/both recordsWritten ,bytesWritten is zero before a non
+ * zero averageRecordSize can be computed.
+ */
+ private static LinkedList<Option<byte[]>> generateCommitMetadataList() throws IOException {
+ LinkedList<Option<byte[]>> commits = new LinkedList<>();
+ // First commit with non zero records and bytes
+ commits.push(Option.of(generateCommitMetadataWith(2000, 10000).toJsonString().getBytes(StandardCharsets.UTF_8)));
+ // Second commit with non zero records and bytes
+ commits.push(Option.of(generateCommitMetadataWith(1500, 7500).toJsonString().getBytes(StandardCharsets.UTF_8)));
+ // Third commit with a small file
+ commits.push(Option.of(generateCommitMetadataWith(100, 500).toJsonString().getBytes(StandardCharsets.UTF_8)));
+ // Fourth commit with both zero records and zero bytes
+ commits.push(Option.of(generateCommitMetadataWith(0, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
+ // Fifth commit with zero records
+ commits.push(Option.of(generateCommitMetadataWith(0, 1500).toJsonString().getBytes(StandardCharsets.UTF_8)));
+ // Sixth commit with zero bytes
+ commits.push(Option.of(generateCommitMetadataWith(2500, 0).toJsonString().getBytes(StandardCharsets.UTF_8)));
+ return commits;
+ }
+
+ @Test
+ public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws Exception {
+ HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
+ HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1000).build())
+ .build();
+ when(commitTimeLine.empty()).thenReturn(false);
+ when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream());
+ LinkedList<Option<byte[]>> commits = generateCommitMetadataList();
+ when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop());
+ long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500);
+ long actualAvgSize = averageBytesPerRecord(commitTimeLine, config);
+ assertEquals(expectAvgSize, actualAvgSize);
+ }
+
+ @Test
+ public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception {
+ HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
+ HoodieWriteConfig config = makeHoodieClientConfigBuilder().build();
+ when(commitTimeLine.empty()).thenReturn(true);
+ long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
+ long actualAvgSize = averageBytesPerRecord(commitTimeLine, config);
+ assertEquals(expectAvgSize, actualAvgSize);
+ }
+
@Test
public void testUpsertPartitioner() throws Exception {
final String testPartitionPath = "2016/09/26";
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
index 9f7ed23..a6de0f5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java
@@ -70,7 +70,9 @@ import java.util.stream.Stream;
public class HoodieTestDataGenerator {
// based on examination of sample file, the schema produces the following per record size
- public static final int SIZE_PER_RECORD = 50 * 1024;
+ public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
+ // with default bloom filter with 60,000 entries and 0.000000001 FPRate
+ public static final int BLOOM_FILTER_BYTES = 323495;
private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class);
public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
@@ -144,6 +146,10 @@ public class HoodieTestDataGenerator {
}
}
+ public int getEstimatedFileSizeInBytes(int numOfRecords) {
+ return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES;
+ }
+
public TestRawTripPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
return generateRandomValue(key, commitTime, isFlattened);