You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/07/14 02:34:20 UTC
[hudi] branch master updated: [HUDI-1068] Fixing deletes in global
bloom when update partition path is set (#1793)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 21bb1b5 [HUDI-1068] Fixing deletes in global bloom when update partition path is set (#1793)
21bb1b5 is described below
commit 21bb1b505a0a433bdbe367f7f252afdddafafc96
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Mon Jul 13 22:34:07 2020 -0400
[HUDI-1068] Fixing deletes in global bloom when update partition path is set (#1793)
---
.../hudi/index/bloom/HoodieGlobalBloomIndex.java | 8 +-
.../hudi/index/simple/HoodieGlobalSimpleIndex.java | 8 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 209 +++++++++++++++++----
.../hudi/testutils/HoodieClientTestUtils.java | 21 +++
4 files changed, 200 insertions(+), 46 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 1e57a38..4f93b30 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -123,11 +123,13 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
if (config.getBloomIndexUpdatePartitionPath()
&& !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
// Create an empty record to delete the record in the old partition
- HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
+ HoodieRecord<T> deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
new EmptyHoodieRecordPayload());
+ deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get()._1());
+ deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
- HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
- return Arrays.asList(emptyRecord, taggedRecord).iterator();
+ HoodieRecord<T> insertRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
+ return Arrays.asList(deleteRecord, insertRecord).iterator();
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index bb1d8d6..990f02d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -130,10 +130,12 @@ public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends Hood
HoodieRecordLocation location = partitionPathLocationPair.get().getRight();
if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
// Create an empty record to delete the record in the old partition
- HoodieRecord<T> emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
+ HoodieRecord<T> deleteRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
+ deleteRecord.setCurrentLocation(location);
+ deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
- HoodieRecord<T> taggedRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
- taggedRecords = Arrays.asList(emptyRecord, taggedRecord);
+ HoodieRecord<T> insertRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
+ taggedRecords = Arrays.asList(deleteRecord, insertRecord);
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 1f86bb26..5518a3f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
@@ -55,7 +56,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.FileInputStream;
import java.io.IOException;
@@ -73,6 +78,9 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.NULL_SCHEMA;
import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -143,7 +151,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
* @throws Exception in case of failure
*/
private void testAutoCommit(Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
- boolean isPrepped) throws Exception {
+ boolean isPrepped) throws Exception {
// Set autoCommit false
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -275,12 +283,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
/**
* Test one of HoodieWriteClient upsert(Prepped) APIs.
*
- * @param config Write Config
+ * @param config Write Config
* @param writeFn One of Hoodie Write Function API
* @throws Exception in case of error
*/
private void testUpsertsInternal(HoodieWriteConfig config,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
@@ -400,56 +408,177 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
/**
- * Test update of a record to different partition with Global Index.
+ * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
*/
- @Test
- public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
- HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
- /**
- * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
- */
- String newCommitTime = "001";
- List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+ @ParameterizedTest
+ @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})
+ public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception {
+ testUpsertsUpdatePartitionPath(indexType, getConfig(),
+ HoodieWriteClient::upsert);
+ }
- // Write 1 (only inserts)
+ /**
+ * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+ * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+ * in the new partition.
+ * test structure:
+ * 1. insert 1 batch
+ * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+ * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+ * records are upserted to the new partition
+ *
+ * @param indexType index type to be tested for
+ * @param config instance of {@link HoodieWriteConfig} to use
+ * @param writeFn write function to be used for testing
+ */
+ private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConfig config,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+ throws Exception {
+ // instantiate client
+
+ HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+ .withProps(config.getProps())
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+ .withBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .build()).withTimelineLayoutVersion(VERSION_0).build();
+ HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+ metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+ metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+ HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+ // Write 1
+ String newCommitTime = "001";
+ int numRecords = 10;
client.startCommitWithTime(newCommitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
- JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+ Set<Pair<String, String>> expectedPartitionPathRecKeyPairs = new HashSet<>();
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : records) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ }
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(inserts1, fs);
- String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+ // Check the entire dataset has all records
+ String[] fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
+
+ // verify one basefile per partition
+ Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+ assertEquals(1, entry.getValue());
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+ assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1));
- /**
- * Write 2. Updates with different partition
- */
- newCommitTime = "004";
+ // Write 2
+ newCommitTime = "002";
+ numRecords = 20; // so that a new file id is created
client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
- JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+ List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+ // populate expected partition path and record keys
+ for (HoodieRecord rec : recordsSecondBatch) {
+ expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ }
+ writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ result.collect();
- JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
- List<WriteStatus> statuses1 = result1.collect();
- assertNoWriteErrors(statuses1);
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
+
+ // verify that there are more than 1 basefiles per partition
+ // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+ baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+ assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1,
+ "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+ // Write 3 (upserts to records from batch 1 with diff partition path)
+ newCommitTime = "003";
+
+ // update to diff partition paths
+ List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+ for (HoodieRecord rec : records) {
+ // remove older entry from expected partition path record key pairs
+ expectedPartitionPathRecKeyPairs
+ .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+ String partitionPath = rec.getPartitionPath();
+ String newPartitionPath = null;
+ if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+ } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+ } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+ newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+ } else {
+ throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+ }
+ recordsToUpsert.add(
+ new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+ rec.getData()));
+ // populate expected partition path and record keys
+ expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+ }
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(updates1, fs);
- // Check the entire dataset has all records still
- fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ writeRecords = jsc.parallelize(recordsToUpsert, 1);
+ result = writeFn.apply(client, writeRecords, newCommitTime);
+ statuses = result.collect();
+
+ // Check the entire dataset has all records
+ fullPartitionPaths = getFullPartitionPaths();
+ assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
+ }
+
+ private void assertPartitionPathRecordKeys(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
+ Dataset<Row> rows = getAllRows(fullPartitionPaths);
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows);
+ // verify all partitionpath, record key matches
+ assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
+ }
+
+ private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+ for (Row row : rows.collectAsList()) {
+ actualPartitionPathRecKeyPairs
+ .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+ }
+ return actualPartitionPathRecKeyPairs;
+ }
+
+ private Dataset<org.apache.spark.sql.Row> getAllRows(String[] fullPartitionPaths) {
+ return HoodieClientTestUtils
+ .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+ }
+
+ private String[] getFullPartitionPaths() {
+ String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
- assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
- "Must contain 100 records");
+ return fullPartitionPaths;
+ }
+
+ private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
+ return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths);
+ }
+
+ private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs,
+ List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+ // verify all partitionpath, record key matches
+ assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+ for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+ assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+ }
+
+ for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+ assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+ }
}
/**
@@ -715,7 +844,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, HoodieWriteClient client,
- int sizeToInsertAndUpdate, int expectedTotalRecords)
+ int sizeToInsertAndUpdate, int expectedTotalRecords)
throws IOException {
client.startCommitWithTime(instantTime);
List<HoodieRecord> inserts = dataGen.generateInserts(instantTime, sizeToInsertAndUpdate);
@@ -740,7 +869,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
private void testDeletes(HoodieWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete,
- String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
+ String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
client.startCommitWithTime(instantTime);
List<HoodieKey> hoodieKeysToDelete = HoodieClientTestUtils
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 145cde5..1bb8275 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -67,6 +67,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -223,6 +224,26 @@ public class HoodieClientTestUtils {
}
}
+ /**
+ * Find total basefiles for passed in paths.
+ */
+ public static Map<String, Integer> getBaseFileCountForPaths(String basePath, FileSystem fs,
+ String... paths) {
+ Map<String, Integer> toReturn = new HashMap<>();
+ try {
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+ for (String path : paths) {
+ BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
+ metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
+ List<HoodieBaseFile> latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
+ toReturn.put(path, latestFiles.size());
+ }
+ return toReturn;
+ } catch (Exception e) {
+ throw new HoodieException("Error reading hoodie table as a dataframe", e);
+ }
+ }
+
public static String writeParquetFile(String basePath, String partitionPath, String filename,
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {