You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/04 20:05:04 UTC

[GitHub] [hudi] nsivabalan opened a new pull request #1793: [HUDI-1068] Fixing deletes in global bloom

nsivabalan opened a new pull request #1793:
URL: https://github.com/apache/hudi/pull/1793


   ## What is the purpose of the pull request
   
   - When updating partition path to an existing record in global bloom, the location is set to empty. So, this in turn results in latest file id to be picked up. But we want the same fileId where actual record is found to be tagged. 
   
   ## Brief change log
   
   - Fix Global bloom to ensure HoodieRecordLocation is set in record that needs to be deleted when partition path is getting updated. 
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   
     - *Added tests in TestHoodieClientOnCopyOnWriteStorage to verify the change.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r453215643



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
##########
@@ -123,11 +123,13 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
         if (config.getBloomIndexUpdatePartitionPath()
             && !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
           // Create an empty record to delete the record in the old partition
-          HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
+          HoodieRecord<T> deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
               new EmptyHoodieRecordPayload());
+          deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get()._1());
+          deleteRecord.seal();
           // Tag the incoming record for inserting to the new partition
-          HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
-          return Arrays.asList(emptyRecord, taggedRecord).iterator();
+          HoodieRecord<T> insetRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
+          return Arrays.asList(deleteRecord, insetRecord).iterator();

Review comment:
       nit: typo `insertRecord`

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      // populate expected partition path and record keys
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
-    // Check the entire dataset has all records still
-    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
+
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+  }
+
+  private void assertPartitionPathRecordKeys(String[] fullPartitionPaths, List<Pair<String, String>> expectedPartitionPathRecKeyPairs) {

Review comment:
       to align with junit assertions, can we make the expected variable 1st argument?

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      // populate expected partition path and record keys
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
-    // Check the entire dataset has all records still
-    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
+
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+  }
+
+  private void assertPartitionPathRecordKeys(String[] fullPartitionPaths, List<Pair<String, String>> expectedPartitionPathRecKeyPairs) {
+    Dataset<Row> rows = getAllRows(fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows);
+    // verify all partitionpath, record key matches
+    assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
+  }
+
+  private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
+    return actualPartitionPathRecKeyPairs;
+  }
+
+  private Dataset<org.apache.spark.sql.Row> getAllRows(String[] fullPartitionPaths) {
+    return HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+  }
+
+  private String[] getFullPartitionPaths() {
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    return fullPartitionPaths;
+  }
+
+  private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
+    return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths);
+  }
+
+  private void assertActualAndExpectedPartitionPathRecordKeyMatches(List<Pair<String, String>> expectedPartitionPathRecKeyPairs,
+      List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }

Review comment:
       Looks like `Set<Pair<String, String>>` is better suited for `expectedPartitionPathRecKeyPairs`? so these can be simplified as `assertEquals(expected, actual);`

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }

Review comment:
       these 2 can be combined into a @ParameterizedTest with IndexType as argument

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();

Review comment:
       result.collect() return value not used?

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));

Review comment:
       can this be replaced by using a new variable for `expectedPartitionPathRecKeyPairs`? a bit hard to track what is contained.

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more than 1 base file after 2nd batch of writes");

Review comment:
       these lines can be simplified to `assertTrue(entrySet().stream().filter().count()>=1)`

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }

Review comment:
       assertAll() works better for iterating assertions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r450809602



##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition
+   * is deleted appropriately.
+   * @throws Exception
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in
+   * old partition is deleted appropriately.
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+      HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // Force using older timeline layout
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+        .withTimelineLayoutVersion(
+            VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
+        metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
 
     // Write 1 (only inserts)
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
+    // Check the entire dataset has all records still
     String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    Dataset<Row> rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
+
+    // Check the entire dataset has all records still
+    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    // Write 2 (updates)
+    newCommitTime = "003";
+    records = records.subList(5, 10);
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
+
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
     // Check the entire dataset has all records still
     fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
+
+    // verify all partitionpath, record key matches

Review comment:
       I have refactored the test now. You can check it out. have added assertions to verify basefile counts. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#issuecomment-655880737


   @nsivabalan can we also address simple global index


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r449819653



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
##########
@@ -125,6 +125,9 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
           // Create an empty record to delete the record in the old partition
           HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
               new EmptyHoodieRecordPayload());
+          emptyRecord.unseal();
+          emptyRecord.setCurrentLocation(recordLocationHoodieKeyPair.get()._1());
+          emptyRecord.seal();

Review comment:
       It seems that we don't need to unseal() as the sealed is `false` when using 
   https://github.com/apache/hudi/blob/574dcf920c4677513f6fdc8d441bb42827afa5a2/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java#L67-L73

##########
File path: hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
##########
@@ -131,6 +131,9 @@ public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
             if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
               // Create an empty record to delete the record in the old partition
               HoodieRecord<T> emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
+              emptyRecord.unseal();
+              emptyRecord.setCurrentLocation(location);
+              emptyRecord.seal();

Review comment:
       ditto

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition
+   * is deleted appropriately.
+   * @throws Exception

Review comment:
       minor: `@throws Exception` looks redundant without further info




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan merged pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
nsivabalan merged pull request #1793:
URL: https://github.com/apache/hudi/pull/1793


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r450811380



##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition
+   * is deleted appropriately.
+   * @throws Exception
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in
+   * old partition is deleted appropriately.
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+      HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // Force using older timeline layout
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+        .withTimelineLayoutVersion(
+            VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
+        metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
 
     // Write 1 (only inserts)
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
+    // Check the entire dataset has all records still
     String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    Dataset<Row> rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
+
+    // Check the entire dataset has all records still
+    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    // Write 2 (updates)
+    newCommitTime = "003";
+    records = records.subList(5, 10);
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
+
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
     // Check the entire dataset has all records still
     fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
+
+    // verify all partitionpath, record key matches

Review comment:
       We do assert with expected entries count. so duplicates should be taken care of in that assertion. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#issuecomment-657622530


   @xushiyan : addressed all comments


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r450812695



##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition
+   * is deleted appropriately.
+   * @throws Exception
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in
+   * old partition is deleted appropriately.
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+      HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // Force using older timeline layout

Review comment:
       I don't know actually. I tried removing version_0 and the test fails. If we know why testUpsertsInternal in the same class does it (@bvaradar ), we might have an answer. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r453713674



##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();

Review comment:
       yes, we just need to trigger the action. 

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));

Review comment:
       don't get you. This tracks all partitionpath record key pairs so far. And in this block, for records for which partition path is getting updated, we need to remove those entries from this expected list and add updated entries. So, don't quite understand how introducing new variable will simplify things. would you mind clarifying.

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      // populate expected partition path and record keys
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
-    // Check the entire dataset has all records still
-    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
+
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+  }
+
+  private void assertPartitionPathRecordKeys(String[] fullPartitionPaths, List<Pair<String, String>> expectedPartitionPathRecKeyPairs) {
+    Dataset<Row> rows = getAllRows(fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = getActualPartitionPathAndRecordKeys(rows);
+    // verify all partitionpath, record key matches
+    assertActualAndExpectedPartitionPathRecordKeyMatches(expectedPartitionPathRecKeyPairs, actualPartitionPathRecKeyPairs);
+  }
+
+  private List<Pair<String, String>> getActualPartitionPathAndRecordKeys(Dataset<org.apache.spark.sql.Row> rows) {
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
+    return actualPartitionPathRecKeyPairs;
+  }
+
+  private Dataset<org.apache.spark.sql.Row> getAllRows(String[] fullPartitionPaths) {
+    return HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+  }
+
+  private String[] getFullPartitionPaths() {
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    return fullPartitionPaths;
+  }
+
+  private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
+    return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths);
+  }
+
+  private void assertActualAndExpectedPartitionPathRecordKeyMatches(List<Pair<String, String>> expectedPartitionPathRecKeyPairs,
+      List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }

Review comment:
       I will fix the expected entries to be a Set, but can't fix the actual. we want to capture any duplicates if any. So, can't really use assertEquals(set, list). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r450160809



##########
File path: hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
##########
@@ -125,6 +125,9 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
           // Create an empty record to delete the record in the old partition
           HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,

Review comment:
       and `taggedRecord` to insert..  this makes what this block is doing clearer..

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition
+   * is deleted appropriately.
+   * @throws Exception
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in
+   * old partition is deleted appropriately.
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+      HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // Force using older timeline layout
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+        .withTimelineLayoutVersion(
+            VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
+        metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
 
     // Write 1 (only inserts)
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
+    // Check the entire dataset has all records still
     String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    Dataset<Row> rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
+
+    // Check the entire dataset has all records still
+    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    // Write 2 (updates)
+    newCommitTime = "003";
+    records = records.subList(5, 10);
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
+
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
     // Check the entire dataset has all records still
     fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
+
+    // verify all partitionpath, record key matches

Review comment:
       can. we explicitly check for duplicates?

##########
File path: hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
##########
@@ -125,6 +125,9 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
           // Create an empty record to delete the record in the old partition
           HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,

Review comment:
       lets rename this to `deleteRecord`

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition
+   * is deleted appropriately.
+   * @throws Exception
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in
+   * old partition is deleted appropriately.
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+      HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // Force using older timeline layout

Review comment:
       why is this relevant for this test?

##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,173 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition
+   * is deleted appropriately.
+   * @throws Exception
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in
+   * old partition is deleted appropriately.
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType,
+      HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // Force using older timeline layout
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+        .withTimelineLayoutVersion(
+            VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
+        metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
 
     // Write 1 (only inserts)
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
+    // Check the entire dataset has all records still
     String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+    Dataset<Row> rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    List<Pair<String, String>> actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
+
+    // Check the entire dataset has all records still
+    fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // verify all partitionpath, record key matches
+    assertEquals(expectedPartitionPathRecKeyPairs.size(), actualPartitionPathRecKeyPairs.size());
+    for (Pair<String, String> entry : actualPartitionPathRecKeyPairs) {
+      assertTrue(expectedPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    for (Pair<String, String> entry : expectedPartitionPathRecKeyPairs) {
+      assertTrue(actualPartitionPathRecKeyPairs.contains(entry));
+    }
+
+    // Write 2 (updates)
+    newCommitTime = "003";
+    records = records.subList(5, 10);
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+      String partitionPath = rec.getPartitionPath();
+      String newPartitionPath = null;
+      if (partitionPath.equalsIgnoreCase(DEFAULT_FIRST_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_SECOND_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_SECOND_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_THIRD_PARTITION_PATH;
+      } else if (partitionPath.equalsIgnoreCase(DEFAULT_THIRD_PARTITION_PATH)) {
+        newPartitionPath = DEFAULT_FIRST_PARTITION_PATH;
+      } else {
+        throw new IllegalStateException("Unknown partition path " + rec.getPartitionPath());
+      }
+      recordsToUpsert.add(
+          new HoodieRecord(new HoodieKey(rec.getRecordKey(), newPartitionPath),
+              rec.getData()));
+      expectedPartitionPathRecKeyPairs.add(Pair.of(newPartitionPath, rec.getRecordKey()));
+    }
+
+    writeRecords = jsc.parallelize(recordsToUpsert, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    statuses = result.collect();
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(updates1, fs);
     // Check the entire dataset has all records still
     fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
     for (int i = 0; i < fullPartitionPaths.length; i++) {
       fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
+
+    rows = HoodieClientTestUtils
+        .read(jsc, basePath, sqlContext, fs, fullPartitionPaths);
+    actualPartitionPathRecKeyPairs = new ArrayList<>();
+    for (Row row : rows.collectAsList()) {
+      actualPartitionPathRecKeyPairs
+          .add(Pair.of(row.getAs("_hoodie_partition_path"), row.getAs("_row_key")));
+    }
+
+    // verify all partitionpath, record key matches

Review comment:
       and also atleast two files being present before triggering the update of the partition path




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#discussion_r454059760



##########
File path: hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -399,56 +405,190 @@ public void testDeletes() throws Exception {
   }
 
   /**
-   * Test update of a record to different partition with Global Index.
+   * Tests when update partition path is set in global bloom, existing record in old partition is deleted appropriately.
    */
   @Test
-  public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
-    HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
-    /**
-     * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
-     */
-    String newCommitTime = "001";
-    List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 100);
+  public void testUpsertsUpdatePartitionPathRegularGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_BLOOM, getConfig(),
+        HoodieWriteClient::upsert);
+  }
 
-    // Write 1 (only inserts)
+  /**
+   * Tests when update partition path is set in simple global bloom, existing record in old partition is deleted appropriately.
+   */
+  @Test
+  public void testUpsertsUpdatePartitionPathSimpleGlobalBloom() throws Exception {
+    testUpsertsUpdatePartitionPathGlobalBloom(IndexType.GLOBAL_SIMPLE, getConfig(),
+        HoodieWriteClient::upsert);
+  }
+
+  /**
+   * This test ensures in a global bloom when update partition path is set to true in config, if an incoming record has mismatched partition
+   * compared to whats in storage, then appropriate actions are taken. i.e. old record is deleted in old partition and new one is inserted
+   * in the new partition.
+   * test structure:
+   * 1. insert 1 batch
+   * 2. insert 2nd batch with larger no of records so that a new file group is created for partitions
+   * 3. issue upserts to records from batch 1 with different partition path. This should ensure records from batch 1 are deleted and new
+   * records are upserted to the new partition
+   *
+   * @param indexType index type to be tested for
+   * @param config instance of {@link HoodieWriteConfig} to use
+   * @param writeFn write function to be used for testing
+   */
+  private void testUpsertsUpdatePartitionPathGlobalBloom(IndexType indexType, HoodieWriteConfig config,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn)
+      throws Exception {
+    // instantiate client
+
+    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
+        .withProps(config.getProps())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .build()).withTimelineLayoutVersion(VERSION_0).build();
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
+        metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
+        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Write 1
+    String newCommitTime = "001";
+    int numRecords = 10;
     client.startCommitWithTime(newCommitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
 
-    JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
+    List<Pair<String, String>> expectedPartitionPathRecKeyPairs = new ArrayList<>();
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : records) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
     List<WriteStatus> statuses = result.collect();
-    assertNoWriteErrors(statuses);
 
-    // check the partition metadata is written out
-    assertPartitionMetadataForRecords(inserts1, fs);
-    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
-    for (int i = 0; i < fullPartitionPaths.length; i++) {
-      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    // Check the entire dataset has all records
+    String[] fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify one basefile per partition
+    Map<String, Integer> baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      assertEquals(1, entry.getValue());
     }
-    assertEquals(100, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
-        "Must contain 100 records");
 
-    /**
-     * Write 2. Updates with different partition
-     */
-    newCommitTime = "004";
+    // Write 2
+    newCommitTime = "002";
+    numRecords = 20; // so that a new file id is created
     client.startCommitWithTime(newCommitTime);
 
-    List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
-    JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
+    List<HoodieRecord> recordsSecondBatch = dataGen.generateInserts(newCommitTime, numRecords);
+    // populate expected partition path and record keys
+    for (HoodieRecord rec : recordsSecondBatch) {
+      expectedPartitionPathRecKeyPairs.add(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));
+    }
+    writeRecords = jsc.parallelize(recordsSecondBatch, 1);
+    result = writeFn.apply(client, writeRecords, newCommitTime);
+    result.collect();
 
-    JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
-    List<WriteStatus> statuses1 = result1.collect();
-    assertNoWriteErrors(statuses1);
+    // Check the entire dataset has all records
+    fullPartitionPaths = getFullPartitionPaths();
+    assertPartitionPathRecordKeys(fullPartitionPaths, expectedPartitionPathRecKeyPairs);
+
+    // verify that there are more than 1 basefiles per partition
+    // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
+    baseFileCounts = getBaseFileCounts(fullPartitionPaths);
+    boolean hasMoreThanOneBaseFile = false;
+    for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
+      if (entry.getValue() > 1) {
+        hasMoreThanOneBaseFile = true;
+        break;
+      }
+    }
+    assertTrue(hasMoreThanOneBaseFile, "Atleast one partition should have more than 1 base file after 2nd batch of writes");
+
+    // Write 3 (upserts to records from batch 1 with diff partition path)
+    newCommitTime = "003";
+
+    // update to diff partition paths
+    List<HoodieRecord> recordsToUpsert = new ArrayList<>();
+    for (HoodieRecord rec : records) {
+      // remove older entry from expected partition path record key pairs
+      expectedPartitionPathRecKeyPairs
+          .remove(Pair.of(rec.getPartitionPath(), rec.getRecordKey()));

Review comment:
       ah ok i see. thanks for clarifying




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1793: [HUDI-1068] Fixing deletes in global bloom

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1793:
URL: https://github.com/apache/hudi/pull/1793#issuecomment-656738766


   @nsivabalan feel free to merge, after you resolve conflicts & squash


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org