You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/07 17:58:33 UTC

[hudi] 02/02: [HUDI-5718] Unsupported Operation Exception for compaction (#7874)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 820006e025aff01b2e91450d7d8e36efd981cfae
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Feb 7 23:41:50 2023 +0800

    [HUDI-5718] Unsupported Operation Exception for compaction (#7874)
---
 .../table/action/compact/TestHoodieCompactor.java  | 118 +++++++++++++++------
 .../table/log/HoodieMergedLogRecordScanner.java    |   3 +-
 2 files changed, 87 insertions(+), 34 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c6cd554e289..c0e62631664 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,7 +21,9 @@ package org.apache.hudi.table.action.compact;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,7 +39,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
@@ -53,6 +54,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -113,7 +115,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testCompactionEmpty() throws Exception {
+  public void testCompactionEmpty() {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
@@ -169,41 +171,45 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
       writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Update all the 100 records
-      HoodieTable table = HoodieSparkTable.create(config, context);
       newCommitTime = "101";
+      updateRecords(config, newCommitTime, records);
 
-      List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
-      JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
-      HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
-      JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+      assertLogFilesNumEqualsTo(config, 1);
 
+      String compactionInstantTime = "102";
+      HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime);
+
+      verifyCompaction(result);
+    }
+  }
+
+  @Test
+  public void testSpillingWhenCompaction() throws Exception {
+    // insert 100 records
+    HoodieWriteConfig config = getConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withMemoryConfig(HoodieMemoryConfig.newBuilder()
+            .withMaxMemoryMaxSize(1L, 1L).build()) // force spill
+        .build();
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
-      writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
-      metaClient.reloadActiveTimeline();
-
-      // Verify that all data file has one log file
-      table = HoodieSparkTable.create(config, context);
-      for (String partitionPath : dataGen.getPartitionPaths()) {
-        List<FileSlice> groupedLogFiles =
-            table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
-        for (FileSlice fileSlice : groupedLogFiles) {
-          assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
-        }
-      }
 
-      // Do a compaction
-      table = HoodieSparkTable.create(config, context);
-      String compactionInstantTime = "102";
-      table.scheduleCompaction(context, compactionInstantTime, Option.empty());
-      table.getMetaClient().reloadActiveTimeline();
-      HoodieData<WriteStatus> result = (HoodieData<WriteStatus>) table.compact(
-          context, compactionInstantTime).getWriteStatuses();
-
-      // Verify that all partition paths are present in the WriteStatus result
-      for (String partitionPath : dataGen.getPartitionPaths()) {
-        List<WriteStatus> writeStatuses = result.collectAsList();
-        assertTrue(writeStatuses.stream()
-            .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)).count() > 0);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      writeClient.insert(recordsRDD, newCommitTime).collect();
+
+      // trigger 2 updates following with compaction
+      for (int i = 1; i < 5; i += 2) {
+        // Update all the 100 records
+        newCommitTime = "10" + i;
+        updateRecords(config, newCommitTime, records);
+
+        assertLogFilesNumEqualsTo(config, 1);
+
+        HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));
+
+        verifyCompaction(result);
       }
     }
   }
@@ -212,4 +218,52 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
   }
+
+  private void updateRecords(HoodieWriteConfig config, String newCommitTime, List<HoodieRecord> records) throws IOException {
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
+    JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
+    HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
+    JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+
+    writeClient.startCommitWithTime(newCommitTime);
+    writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
+    metaClient.reloadActiveTimeline();
+  }
+
+  /**
+   * Verify that all data file has {@code expected} number of log files.
+   *
+   * @param config   The writer config
+   * @param expected The expected number of log files
+   */
+  private void assertLogFilesNumEqualsTo(HoodieWriteConfig config, int expected) {
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    for (String partitionPath : dataGen.getPartitionPaths()) {
+      List<FileSlice> groupedLogFiles =
+          table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+      for (FileSlice fileSlice : groupedLogFiles) {
+        assertEquals(expected, fileSlice.getLogFiles().count(), "There should be " + expected + " log file written for every data file");
+      }
+    }
+  }
+
+  /**
+   * Do a compaction.
+   */
+  private HoodieData<WriteStatus> compact(SparkRDDWriteClient writeClient, String compactionInstantTime) {
+    writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime).getWriteStatuses();
+    return HoodieListData.eager(writeStatusJavaRDD.collect());
+  }
+
+  /**
+   * Verify that all partition paths are present in the WriteStatus result.
+   */
+  private void verifyCompaction(HoodieData<WriteStatus> result) {
+    for (String partitionPath : dataGen.getPartitionPaths()) {
+      List<WriteStatus> writeStatuses = result.collectAsList();
+      assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e5ce343eb39..1a256956bfc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,7 +47,6 @@ import org.apache.log4j.Logger;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -215,7 +214,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
   }
 
   public Map<String, HoodieRecord> getRecords() {
-    return Collections.unmodifiableMap(records);
+    return records;
   }
 
   public HoodieRecordType getRecordType() {