You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/01/27 18:10:04 UTC

[hudi] branch master updated: [HUDI-1234] Insert new records to data files without merging for "Insert" operation. (#2111)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ee1c3f  [HUDI-1234] Insert new records to data files without merging for "Insert" operation.  (#2111)
2ee1c3f is described below

commit 2ee1c3fb0cb9740dc6d924f5e6e638aec19ed9b3
Author: SteNicholas <pr...@163.com>
AuthorDate: Thu Jan 28 02:09:51 2021 +0800

    [HUDI-1234] Insert new records to data files without merging for "Insert" operation.  (#2111)
    
    * Added HoodieConcatHandle to skip merging for "insert" operation when the corresponding config is set
    
    Co-authored-by: Sivabalan Narayanan <si...@uber.com>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  15 +++
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  34 +++++-
 .../apache/hudi/io/storage/HoodieConcatHandle.java |  94 ++++++++++++++++
 .../org/apache/hudi/table/WorkloadProfile.java     |  18 ++-
 .../commit/BaseSparkCommitActionExecutor.java      |   5 +-
 .../table/action/commit/UpsertPartitioner.java     |   2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 124 +++++++++++++++++----
 7 files changed, 266 insertions(+), 26 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d8135d4..3fc5a2d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -131,6 +131,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled";
   private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false";
 
+  // Allow duplicates with inserts while merging with existing records
+  private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = "hoodie.merge.allow.duplicate.on.inserts";
+  private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = "false";
+
   /**
    * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
    * multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
@@ -330,6 +334,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
   }
 
+  public boolean allowDuplicateInserts() {
+    return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS));
+  }
+
   public EngineType getEngineType() {
     return engineType;
   }
@@ -1180,6 +1188,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles) {
+      props.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS, String.valueOf(routeInsertsToNewFiles));
+      return this;
+    }
+
     public Builder withProperties(Properties properties) {
       this.props.putAll(properties);
       return this;
@@ -1234,6 +1247,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
       setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
           MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
+      setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_ON_INSERTS),
+          MERGE_ALLOW_DUPLICATE_ON_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS);
 
       // Make sure the props is propagated
       setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4f5b82a..d5df2cb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -58,17 +58,45 @@ import java.util.Map;
 import java.util.Set;
 
 @SuppressWarnings("Duplicates")
+/**
+ * Handle to merge incoming records to those in storage.
+ * <p>
+ * Simplified Logic:
+ * For every existing record
+ *     Check if there is a new record coming in. If yes, merge two records and write to file
+ *     else write the record as is
+ * For all pending records from incoming batch, write to file.
+ *
+ * Illustration with simple data.
+ * Incoming data:
+ *     rec1_2, rec4_2, rec5_1, rec6_1
+ * Existing data:
+ *     rec1_1, rec2_1, rec3_1, rec4_1
+ *
+ * For every existing record, merge w/ incoming if requried and write to storage.
+ *    => rec1_1 and rec1_2 is merged to write rec1_2 to storage
+ *    => rec2_1 is written as is
+ *    => rec3_1 is written as is
+ *    => rec4_2 and rec4_1 is merged to write rec4_2 to storage
+ * Write all pending records from incoming set to storage
+ *    => rec5_1 and rec6_1
+ *
+ * Final snapshot in storage
+ * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
+ *
+ * </p>
+ */
 public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
 
   private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
 
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
-  private HoodieFileWriter<IndexedRecord> fileWriter;
+  protected HoodieFileWriter<IndexedRecord> fileWriter;
 
-  private Path newFilePath;
+  protected Path newFilePath;
   private Path oldFilePath;
-  private long recordsWritten = 0;
+  protected long recordsWritten = 0;
   private long recordsDeleted = 0;
   private long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java
new file mode 100644
index 0000000..ea56689
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#allowDuplicateInserts()}}
+ * is set, this handle will be used instead of {@link HoodieMergeHandle}.
+ *
+ * Simplified Logic:
+ * For every existing record
+ *     Write the record as is
+ * For all incoming records, write to file as is.
+ *
+ * Illustration with simple data.
+ * Incoming data:
+ *     rec1_2, rec4_2, rec5_1, rec6_1
+ * Existing data:
+ *     rec1_1, rec2_1, rec3_1, rec4_1
+ *
+ * For every existing record, write to storage as is.
+ *    => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage
+ * Write all records from incoming set to storage
+ *    => rec1_2, rec4_2, rec5_1 and rec6_1
+ *
+ * Final snapshot in storage
+ * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1
+ *
+ * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
+ * happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
+ */
+public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class);
+
+  public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr,
+      String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
+  }
+
+  public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId,
+      HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
+  }
+
+  /**
+   * Write old record as is w/o merging with incoming record.
+   */
+  @Override
+  public void write(GenericRecord oldRecord) {
+    String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+    try {
+      fileWriter.writeAvro(key, oldRecord);
+    } catch (IOException | RuntimeException e) {
+      String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
+          key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true));
+      LOG.debug("Old record is " + oldRecord);
+      throw new HoodieUpsertException(errMsg, e);
+    }
+    recordsWritten++;
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java
index a56710b..7700e95 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.Serializable;
@@ -41,11 +42,21 @@ public class WorkloadProfile implements Serializable {
    */
   protected final WorkloadStat globalStat;
 
+  /**
+   * Write operation type.
+   */
+  private WriteOperationType operationType;
+
   public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
     this.partitionPathStatMap = profile.getLeft();
     this.globalStat = profile.getRight();
   }
 
+  public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
+    this(profile);
+    this.operationType = operationType;
+  }
+
   public WorkloadStat getGlobalStat() {
     return globalStat;
   }
@@ -62,11 +73,16 @@ public class WorkloadProfile implements Serializable {
     return partitionPathStatMap.get(partitionPath);
   }
 
+  public WriteOperationType getOperationType() {
+    return operationType;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("WorkloadProfile {");
     sb.append("globalStat=").append(globalStat).append(", ");
-    sb.append("partitionStat=").append(partitionPathStatMap);
+    sb.append("partitionStat=").append(partitionPathStatMap).append(", ");
+    sb.append("operationType=").append(operationType);
     sb.append('}');
     return sb.toString();
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 2fabbbf..5a4d79c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -43,6 +43,7 @@ import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.storage.HoodieConcatHandle;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
@@ -119,7 +120,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     WorkloadProfile profile = null;
     if (isWorkloadProfileNeeded()) {
       context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
-      profile = new WorkloadProfile(buildProfile(inputRecordsRDD));
+      profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType);
       LOG.info("Workload profile :" + profile);
       saveWorkloadProfileMetadataToInflight(profile, instantTime);
     }
@@ -320,6 +321,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
   protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
     if (table.requireSortedRecords()) {
       return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier);
+    } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
+      return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
     } else {
       return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
     }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 000cfc7..ee153c8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -190,7 +190,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
         for (SmallFile smallFile : smallFiles) {
           long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
               totalUnassignedInserts);
-          if (recordsToAppend > 0) {
+          if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
             // create a new bucket or re-use an existing bucket
             int bucket;
             if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 48a4d12..b4a392e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -79,7 +79,9 @@ import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -128,6 +130,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     }
   };
 
+  private static Stream<Arguments> configParams() {
+    return Arrays.stream(new Boolean[][] {{true},{false}}).map(Arguments::of);
+  }
+
   private HoodieTestTable testTable;
 
   @BeforeEach
@@ -451,7 +457,62 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   /**
-   * Tesst deletion of records.
+   * Test Insert API for HoodieConcatHandle.
+   */
+  @Test
+  public void testInsertsWithHoodieConcatHandle() throws Exception {
+    testHoodieConcatHandle(getConfig(), false);
+  }
+
+  /**
+   * Test InsertPrepped API for HoodieConcatHandle.
+   */
+  @Test
+  public void testInsertsPreppedWithHoodieConcatHandle() throws Exception {
+    testHoodieConcatHandle(getConfig(),  true);
+  }
+
+  /**
+   * Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#insert(Object, String)} API.
+   *
+   * @param config Write Config
+   * @throws Exception in case of error
+   */
+  private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped)
+      throws Exception {
+    // Force using older timeline layout
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(
+            VERSION_0).build();
+
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1 (only inserts)
+    String newCommitTime = "001";
+    String initCommitTime = "000";
+    int numRecords = 200;
+    insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, SparkRDDWriteClient::insert,
+        isPrepped, true, numRecords);
+
+    // Write 2 (updates)
+    String prevCommitTime = newCommitTime;
+    newCommitTime = "004";
+    numRecords = 100;
+    String commitTimeBetweenPrevAndNew = "002";
+
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPrepped, hoodieWriteConfig, dataGen::generateUniqueUpdates);
+
+    writeBatch(client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime,
+        numRecords, recordGenFunction, SparkRDDWriteClient::insert, true, numRecords, 300,
+        2);
+  }
+
+  /**
+   * Tests deletion of records.
    */
   @Test
   public void testDeletes() throws Exception {
@@ -877,13 +938,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   /**
    * Test scenario of new file-group getting added during insert().
    */
-  @Test
-  public void testSmallInsertHandlingForInserts() throws Exception {
-
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts) throws Exception {
     final String testPartitionPath = "2016/09/26";
     final int insertSplitLimit = 100;
     // setup the small file handling params
-    HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
+    HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config, false);
 
@@ -894,10 +955,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     Set<String> keys1 = recordsToRecordKeySet(inserts1);
     JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
     List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
-
     assertNoWriteErrors(statuses);
     assertPartitionMetadata(new String[] {testPartitionPath}, fs);
-
     assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
     String file1 = statuses.get(0).getFileId();
     assertEquals(100,
@@ -912,14 +971,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
     statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
     assertNoWriteErrors(statuses);
-
     assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
     assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
     assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
+
     Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
     assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
         "file should contain 140 records");
-
     List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
     for (GenericRecord record : records) {
       String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -933,11 +991,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // Lots of inserts such that file1 is updated and expanded, a new file2 is created.
     String commitTime3 = "003";
     client.startCommitWithTime(commitTime3);
-    List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 200);
-    JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(insert3, 1);
+    List<HoodieRecord> inserts3 = dataGen.generateInserts(commitTime3, 200);
+    JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(inserts3, 1);
     statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
     assertNoWriteErrors(statuses);
     assertEquals(2, statuses.size(), "2 files needs to be committed.");
+    assertEquals(340,
+        readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size()
+            + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(),
+        "file should contain 340 records");
 
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
     HoodieTable table = getHoodieTable(metaClient, config);
@@ -948,11 +1010,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     int totalInserts = 0;
     for (HoodieBaseFile file : files) {
       assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3");
-      records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
-      totalInserts += records.size();
+      totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size();
     }
-    assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(),
-        "Total number of records must add up");
+    assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up");
   }
 
   /**
@@ -1040,7 +1100,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
     testClustering(clusteringConfig);
   }
-  
+
   private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception {
     // create config to not update small files.
     HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10);
@@ -1642,22 +1702,45 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
    * Build Hoodie Write Config for small data file sizes.
    */
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) {
-    return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150));
+    return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, false);
+  }
+
+  /**
+   * Build Hoodie Write Config for small data file sizes.
+   */
+  private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeAllowDuplicateInserts) {
+    return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), mergeAllowDuplicateInserts);
   }
 
   /**
    * Build Hoodie Write Config for specified small file sizes.
    */
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) {
+    return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, smallFileSize, false);
+  }
+
+  /**
+   * Build Hoodie Write Config for specified small file sizes.
+   */
+  private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeAllowDuplicateInserts) {
     String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA;
-    return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize);
+    return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts);
   }
 
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) {
-    return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, new Properties());
+    return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false);
+  }
+
+  private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
+    return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts, new Properties());
   }
-  
+
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, Properties props) {
+    return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, props);
+  }
+
+  private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts,
+      Properties props) {
     HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
     return builder
         .withCompactionConfig(
@@ -1668,6 +1751,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
             HoodieStorageConfig.newBuilder()
                 .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
                 .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
+        .withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts)
         .withProps(props)
         .build();
   }