You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/07/14 02:34:20 UTC

[hudi] branch master updated: [HUDI-1068] Fixing deletes in global bloom when update partition path is set (#1793)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21bb1b5  [HUDI-1068] Fixing deletes in global bloom when update partition path is set (#1793)
21bb1b5 is described below

commit 21bb1b505a0a433bdbe367f7f252afdddafafc96
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Mon Jul 13 22:34:07 2020 -0400

    [HUDI-1068] Fixing deletes in global bloom when update partition path is set (#1793)
---
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   |   8 +-
 .../hudi/index/simple/HoodieGlobalSimpleIndex.java |   8 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 209 +++++++++++++++++----
 .../hudi/testutils/HoodieClientTestUtils.java      |  21 +++
 4 files changed, 200 insertions(+), 46 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 1e57a38..4f93b30 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -123,11 +123,13 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
         if (config.getBloomIndexUpdatePartitionPath()
             && !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
           // Create an empty record to delete the record in the old partition
-          HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
+          HoodieRecord<T> deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
               new EmptyHoodieRecordPayload());
+          deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get()._1());
+          deleteRecord.seal();
           // Tag the incoming record for inserting to the new partition
-          HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
-          return Arrays.asList(emptyRecord, taggedRecord).iterator();
+          HoodieRecord<T> insertRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
+          return Arrays.asList(deleteRecord, insertRecord).iterator();
         } else {
           // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
           // When it differs, the record will still be updated at its old partition.
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index bb1d8d6..990f02d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -130,10 +130,12 @@ public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends Hood
             HoodieRecordLocation location = partitionPathLocationPair.get().getRight();
             if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
               // Create an empty record to delete the record in the old partition
-              HoodieRecord<T> emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
+              HoodieRecord<T> deleteRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
+              deleteRecord.setCurrentLocation(location);
+              deleteRecord.seal();
               // Tag the incoming record for inserting to the new partition
-              HoodieRecord<T> taggedRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
-              taggedRecords = Arrays.asList(emptyRecord, taggedRecord);
+              HoodieRecord<T> insertRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
+              taggedRecords = Arrays.asList(deleteRecord, insertRecord);
             } else {
               // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
               // When it differs, the record will still be updated at its old partition.
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 1f86bb26..5518a3f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
@@ -55,7 +56,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -73,6 +78,9 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
 import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static org.apache.hudi.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
 import static org.apache.hudi.testutils.HoodieTestDataGenerator.NULL_SCHEMA;
 import static org.apache.hudi.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -143,7 +151,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
    * @throws Exception in case of failure
    */
   private void testAutoCommit(Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
-                              boolean isPrepped) throws Exception {
+      boolean isPrepped) throws Exception {
     // Set autoCommit false
     HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
     try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
@@ -275,12 +283,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   /**
    * Test one of HoodieWriteClient upsert(Prepped) APIs.
    *
-   * @param config  Write Config
+   * @param config Write Config
    * @param writeFn One of Hoodie Write Function API
    * @throws Exception in case of error
    */
   private void testUpsertsInternal(HoodieWriteConfig config,
-                                   Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
       throws Exception {
     // Force using older timeline layout
     HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
@@ -400,56 +408,177 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
-  @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"GLOBAL_BLOOM", "GLOBAL_SIMPLE"})
+  public void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType) throws Exception {
+    testUpsertsUpdatePartitionPath(indexType, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    Set<Pair<String, String>> expectedPartitionPathRecKeyPairs = new HashSet<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1));
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1,
+        "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      // populate expected partition path and record keys
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
-    // Check the entire dataset has all records still
-    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
+
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(expectedPartitionPathRecKeyPairs, fullPartitionPaths);
+  }
+
+  private void assertPartitionPathRecordKeys(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs, String[] fullPartitionPaths) {
+    Dataset<Row> rows = getAllRows(fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows);
+    // verify all partitionpath, record key matches
+    assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
+  }
+
+  private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
+    return actualPartitionPathRecKeyPairs;
+  }
+
+  private Dataset<org.apache.spark.sql.Row> getAllRows(String[] fullPartitionPaths) {
+    return HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+  }
+
+  private String[] getFullPartitionPaths() {
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    return fullPartitionPaths;
+  }
+
+  private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
+    return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths);
+  }
+
+  private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs,
+      List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
   }
 
   /**
@@ -715,7 +844,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, HoodieWriteClient client,
-                                                            int sizeToInsertAndUpdate, int expectedTotalRecords)
+      int sizeToInsertAndUpdate, int expectedTotalRecords)
       throws IOException {
     client.startCommitWithTime(instantTime);
     List<HoodieRecord> inserts = dataGen.generateInserts(instantTime, sizeToInsertAndUpdate);
@@ -740,7 +869,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   private void testDeletes(HoodieWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete,
-                           String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
+      String existingFile, String instantTime, int exepctedRecords, List<String> keys) {
     client.startCommitWithTime(instantTime);
 
     List<HoodieKey> hoodieKeysToDelete = HoodieClientTestUtils
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 145cde5..1bb8275 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -67,6 +67,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -223,6 +224,26 @@ public class HoodieClientTestUtils {
     }
   }
 
+  /**
+   * Find total basefiles for passed in paths.
+   */
+  public static Map<String, Integer> getBaseFileCountForPaths(String basePath, FileSystem fs,
+      String... paths) {
+    Map<String, Integer> toReturn = new HashMap<>();
+    try {
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+      for (String path : paths) {
+        BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
+            metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
+        List<HoodieBaseFile> latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
+        toReturn.put(path, latestFiles.size());
+      }
+      return toReturn;
+    } catch (Exception e) {
+      throw new HoodieException("Error reading hoodie table as a dataframe", e);
+    }
+  }
+
   public static String writeParquetFile(String basePath, String partitionPath, String filename,
                                         List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {