You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/01/27 18:10:04 UTC
[hudi] branch master updated: [HUDI-1234] Insert new records to
data files without merging for "Insert" operation. (#2111)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2ee1c3f [HUDI-1234] Insert new records to data files without merging for "Insert" operation. (#2111)
2ee1c3f is described below
commit 2ee1c3fb0cb9740dc6d924f5e6e638aec19ed9b3
Author: SteNicholas <pr...@163.com>
AuthorDate: Thu Jan 28 02:09:51 2021 +0800
[HUDI-1234] Insert new records to data files without merging for "Insert" operation. (#2111)
* Added HoodieConcatHandle to skip merging for "insert" operation when the corresponding config is set
Co-authored-by: Sivabalan Narayanan <si...@uber.com>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 15 +++
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 34 +++++-
.../apache/hudi/io/storage/HoodieConcatHandle.java | 94 ++++++++++++++++
.../org/apache/hudi/table/WorkloadProfile.java | 18 ++-
.../commit/BaseSparkCommitActionExecutor.java | 5 +-
.../table/action/commit/UpsertPartitioner.java | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 124 +++++++++++++++++----
7 files changed, 266 insertions(+), 26 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d8135d4..3fc5a2d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -131,6 +131,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled";
private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false";
+ // Allow duplicates with inserts while merging with existing records
+ private static final String MERGE_ALLOW_DUPLICATE_ON_INSERTS = "hoodie.merge.allow.duplicate.on.inserts";
+ private static final String DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS = "false";
+
/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
@@ -330,6 +334,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
}
+ public boolean allowDuplicateInserts() {
+ return Boolean.parseBoolean(props.getProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS));
+ }
+
public EngineType getEngineType() {
return engineType;
}
@@ -1180,6 +1188,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles) {
+ props.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS, String.valueOf(routeInsertsToNewFiles));
+ return this;
+ }
+
public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
@@ -1234,6 +1247,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
+ setDefaultOnCondition(props, !props.containsKey(MERGE_ALLOW_DUPLICATE_ON_INSERTS),
+ MERGE_ALLOW_DUPLICATE_ON_INSERTS, DEFAULT_MERGE_ALLOW_DUPLICATE_ON_INSERTS);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().withEngineType(engineType).fromProperties(props).build());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4f5b82a..d5df2cb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -58,17 +58,45 @@ import java.util.Map;
import java.util.Set;
@SuppressWarnings("Duplicates")
+/**
+ * Handle to merge incoming records to those in storage.
+ * <p>
+ * Simplified Logic:
+ * For every existing record
+ * Check if there is a new record coming in. If yes, merge two records and write to file
+ * else write the record as is
+ * For all pending records from incoming batch, write to file.
+ *
+ * Illustration with simple data.
+ * Incoming data:
+ * rec1_2, rec4_2, rec5_1, rec6_1
+ * Existing data:
+ * rec1_1, rec2_1, rec3_1, rec4_1
+ *
+ * For every existing record, merge w/ incoming if requried and write to storage.
+ * => rec1_1 and rec1_2 is merged to write rec1_2 to storage
+ * => rec2_1 is written as is
+ * => rec3_1 is written as is
+ * => rec4_2 and rec4_1 is merged to write rec4_2 to storage
+ * Write all pending records from incoming set to storage
+ * => rec5_1 and rec6_1
+ *
+ * Final snapshot in storage
+ * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
+ *
+ * </p>
+ */
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
- private HoodieFileWriter<IndexedRecord> fileWriter;
+ protected HoodieFileWriter<IndexedRecord> fileWriter;
- private Path newFilePath;
+ protected Path newFilePath;
private Path oldFilePath;
- private long recordsWritten = 0;
+ protected long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java
new file mode 100644
index 0000000..ea56689
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Handle to concatenate new records to old records w/o any merging. If Operation is set to Inserts, and if {{@link HoodieWriteConfig#allowDuplicateInserts()}}
+ * is set, this handle will be used instead of {@link HoodieMergeHandle}.
+ *
+ * Simplified Logic:
+ * For every existing record
+ * Write the record as is
+ * For all incoming records, write to file as is.
+ *
+ * Illustration with simple data.
+ * Incoming data:
+ * rec1_2, rec4_2, rec5_1, rec6_1
+ * Existing data:
+ * rec1_1, rec2_1, rec3_1, rec4_1
+ *
+ * For every existing record, write to storage as is.
+ * => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage
+ * Write all records from incoming set to storage
+ * => rec1_2, rec4_2, rec5_1 and rec6_1
+ *
+ * Final snapshot in storage
+ * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1
+ *
+ * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not
+ * happen and every batch should have new records to be inserted. Above example is for illustration purposes only.
+ */
+public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class);
+
+ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr,
+ String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
+ }
+
+ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId,
+ HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
+ }
+
+ /**
+ * Write old record as is w/o merging with incoming record.
+ */
+ @Override
+ public void write(GenericRecord oldRecord) {
+ String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+ try {
+ fileWriter.writeAvro(key, oldRecord);
+ } catch (IOException | RuntimeException e) {
+ String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
+ key, getOldFilePath(), newFilePath, writerSchemaWithMetafields.toString(true));
+ LOG.debug("Old record is " + oldRecord);
+ throw new HoodieUpsertException(errMsg, e);
+ }
+ recordsWritten++;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java
index a56710b..7700e95 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/WorkloadProfile.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;
import java.io.Serializable;
@@ -41,11 +42,21 @@ public class WorkloadProfile implements Serializable {
*/
protected final WorkloadStat globalStat;
+ /**
+ * Write operation type.
+ */
+ private WriteOperationType operationType;
+
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
this.partitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
}
+ public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
+ this(profile);
+ this.operationType = operationType;
+ }
+
public WorkloadStat getGlobalStat() {
return globalStat;
}
@@ -62,11 +73,16 @@ public class WorkloadProfile implements Serializable {
return partitionPathStatMap.get(partitionPath);
}
+ public WriteOperationType getOperationType() {
+ return operationType;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadProfile {");
sb.append("globalStat=").append(globalStat).append(", ");
- sb.append("partitionStat=").append(partitionPathStatMap);
+ sb.append("partitionStat=").append(partitionPathStatMap).append(", ");
+ sb.append("operationType=").append(operationType);
sb.append('}');
return sb.toString();
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 2fabbbf..5a4d79c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -43,6 +43,7 @@ import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
+import org.apache.hudi.io.storage.HoodieConcatHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
@@ -119,7 +120,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
- profile = new WorkloadProfile(buildProfile(inputRecordsRDD));
+ profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType);
LOG.info("Workload profile :" + profile);
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}
@@ -320,6 +321,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
if (table.requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier);
+ } else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
+ return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
} else {
return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 000cfc7..ee153c8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -190,7 +190,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
for (SmallFile smallFile : smallFiles) {
long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
totalUnassignedInserts);
- if (recordsToAppend > 0) {
+ if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
// create a new bucket or re-use an existing bucket
int bucket;
if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 48a4d12..b4a392e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -79,7 +79,9 @@ import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -128,6 +130,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
};
+ private static Stream<Arguments> configParams() {
+ return Arrays.stream(new Boolean[][] {{true},{false}}).map(Arguments::of);
+ }
+
private HoodieTestTable testTable;
@BeforeEach
@@ -451,7 +457,62 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
/**
- * Tesst deletion of records.
+ * Test Insert API for HoodieConcatHandle.
+ */
+ @Test
+ public void testInsertsWithHoodieConcatHandle() throws Exception {
+ testHoodieConcatHandle(getConfig(), false);
+ }
+
+ /**
+ * Test InsertPrepped API for HoodieConcatHandle.
+ */
+ @Test
+ public void testInsertsPreppedWithHoodieConcatHandle() throws Exception {
+ testHoodieConcatHandle(getConfig(), true);
+ }
+
+ /**
+ * Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#insert(Object, String)} API.
+ *
+ * @param config Write Config
+ * @throws Exception in case of error
+ */
+ private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped)
+ throws Exception {
+ // Force using older timeline layout
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(
+ VERSION_0).build();
+
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1 (only inserts)
+ String newCommitTime = "001";
+ String initCommitTime = "000";
+ int numRecords = 200;
+ insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, SparkRDDWriteClient::insert,
+ isPrepped, true, numRecords);
+
+ // Write 2 (updates)
+ String prevCommitTime = newCommitTime;
+ newCommitTime = "004";
+ numRecords = 100;
+ String commitTimeBetweenPrevAndNew = "002";
+
+ final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+ generateWrapRecordsFn(isPrepped, hoodieWriteConfig, dataGen::generateUniqueUpdates);
+
+ writeBatch(client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime,
+ numRecords, recordGenFunction, SparkRDDWriteClient::insert, true, numRecords, 300,
+ 2);
+ }
+
+ /**
+ * Tests deletion of records.
*/
@Test
public void testDeletes() throws Exception {
@@ -877,13 +938,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
/**
* Test scenario of new file-group getting added during insert().
*/
- @Test
- public void testSmallInsertHandlingForInserts() throws Exception {
-
+ @ParameterizedTest
+ @MethodSource("configParams")
+ public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts) throws Exception {
final String testPartitionPath = "2016/09/26";
final int insertSplitLimit = 100;
// setup the small file handling params
- HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
+ HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
@@ -894,10 +955,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
Set<String> keys1 = recordsToRecordKeySet(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
-
assertNoWriteErrors(statuses);
assertPartitionMetadata(new String[] {testPartitionPath}, fs);
-
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
@@ -912,14 +971,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
-
assertEquals(1, statuses.size(), "Just 1 file needs to be updated.");
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded");
+
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 140 records");
-
List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf, newFile);
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -933,11 +991,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// Lots of inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
client.startCommitWithTime(commitTime3);
- List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 200);
- JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(insert3, 1);
+ List<HoodieRecord> inserts3 = dataGen.generateInserts(commitTime3, 200);
+ JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(inserts3, 1);
statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
+ assertEquals(340,
+ readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size()
+ + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(),
+ "file should contain 340 records");
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTable table = getHoodieTable(metaClient, config);
@@ -948,11 +1010,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
int totalInserts = 0;
for (HoodieBaseFile file : files) {
assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3");
- records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath()));
- totalInserts += records.size();
+ totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size();
}
- assertEquals(totalInserts, inserts1.size() + inserts2.size() + insert3.size(),
- "Total number of records must add up");
+ assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up");
}
/**
@@ -1040,7 +1100,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testClustering(clusteringConfig);
}
-
+
private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10);
@@ -1642,22 +1702,45 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
* Build Hoodie Write Config for small data file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) {
- return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150));
+ return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, false);
+ }
+
+ /**
+ * Build Hoodie Write Config for small data file sizes.
+ */
+ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, boolean mergeAllowDuplicateInserts) {
+ return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150), mergeAllowDuplicateInserts);
}
/**
* Build Hoodie Write Config for specified small file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) {
+ return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, smallFileSize, false);
+ }
+
+ /**
+ * Build Hoodie Write Config for specified small file sizes.
+ */
+ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize, boolean mergeAllowDuplicateInserts) {
String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA;
- return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize);
+ return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts);
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) {
- return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, new Properties());
+ return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false);
+ }
+
+ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
+ return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, mergeAllowDuplicateInserts, new Properties());
}
-
+
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, Properties props) {
+ return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, false, props);
+ }
+
+ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts,
+ Properties props) {
HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
return builder
.withCompactionConfig(
@@ -1668,6 +1751,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
.parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
+ .withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts)
.withProps(props)
.build();
}