You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/07 17:58:31 UTC

[hudi] branch release-0.13.0 updated (721b91304ca -> 820006e025a)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a change to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 721b91304ca Bumping release candidate number 2
     new b711e39c940 [MINOR] Added safety-net check to catch any potential issue to deduce parallelism from the incoming `Dataset` appropriately (#7873)
     new 820006e025a [HUDI-5718] Unsupported Operation Exception for compaction (#7874)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |  11 ++
 .../table/action/compact/TestHoodieCompactor.java  | 118 +++++++++++++++------
 .../table/log/HoodieMergedLogRecordScanner.java    |   3 +-
 3 files changed, 98 insertions(+), 34 deletions(-)


[hudi] 01/02: [MINOR] Added safety-net check to catch any potential issue to deduce parallelism from the incoming `Dataset` appropriately (#7873)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b711e39c9400c8595733d57e889eecb0e88a4b99
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Tue Feb 7 07:41:28 2023 -0800

    [MINOR] Added safety-net check to catch any potential issue to deduce parallelism from the incoming `Dataset` appropriately (#7873)
---
 .../scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index a6488b07b51..e239db1b5a5 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -203,6 +203,17 @@ object HoodieDatasetBulkInsertHelper
       .values
   }
 
+  override protected def deduceShuffleParallelism(input: DataFrame, configuredParallelism: Int): Int = {
+    val deduceParallelism = super.deduceShuffleParallelism(input, configuredParallelism)
+    // NOTE: In case parallelism deduction failed to accurately deduce parallelism level of the
+    //       incoming dataset we fallback to default parallelism level set for this Spark session
+    if (deduceParallelism > 0) {
+      deduceParallelism
+    } else {
+      input.sparkSession.sparkContext.defaultParallelism
+    }
+  }
+
   private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = {
     val partitionPathFields = getPartitionPathFields(config).toSet
     val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))


[hudi] 02/02: [HUDI-5718] Unsupported Operation Exception for compaction (#7874)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 820006e025aff01b2e91450d7d8e36efd981cfae
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Feb 7 23:41:50 2023 +0800

    [HUDI-5718] Unsupported Operation Exception for compaction (#7874)
---
 .../table/action/compact/TestHoodieCompactor.java  | 118 +++++++++++++++------
 .../table/log/HoodieMergedLogRecordScanner.java    |   3 +-
 2 files changed, 87 insertions(+), 34 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c6cd554e289..c0e62631664 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,7 +21,9 @@ package org.apache.hudi.table.action.compact;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,7 +39,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
@@ -53,6 +54,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -113,7 +115,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   }
 
   @Test
-  public void testCompactionEmpty() throws Exception {
+  public void testCompactionEmpty() {
     HoodieWriteConfig config = getConfig();
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
@@ -169,41 +171,45 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
       writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Update all the 100 records
-      HoodieTable table = HoodieSparkTable.create(config, context);
       newCommitTime = "101";
+      updateRecords(config, newCommitTime, records);
 
-      List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
-      JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
-      HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
-      JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+      assertLogFilesNumEqualsTo(config, 1);
 
+      String compactionInstantTime = "102";
+      HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime);
+
+      verifyCompaction(result);
+    }
+  }
+
+  @Test
+  public void testSpillingWhenCompaction() throws Exception {
+    // insert 100 records
+    HoodieWriteConfig config = getConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withMemoryConfig(HoodieMemoryConfig.newBuilder()
+            .withMaxMemoryMaxSize(1L, 1L).build()) // force spill
+        .build();
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
-      writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
-      metaClient.reloadActiveTimeline();
-
-      // Verify that all data file has one log file
-      table = HoodieSparkTable.create(config, context);
-      for (String partitionPath : dataGen.getPartitionPaths()) {
-        List<FileSlice> groupedLogFiles =
-            table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
-        for (FileSlice fileSlice : groupedLogFiles) {
-          assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
-        }
-      }
 
-      // Do a compaction
-      table = HoodieSparkTable.create(config, context);
-      String compactionInstantTime = "102";
-      table.scheduleCompaction(context, compactionInstantTime, Option.empty());
-      table.getMetaClient().reloadActiveTimeline();
-      HoodieData<WriteStatus> result = (HoodieData<WriteStatus>) table.compact(
-          context, compactionInstantTime).getWriteStatuses();
-
-      // Verify that all partition paths are present in the WriteStatus result
-      for (String partitionPath : dataGen.getPartitionPaths()) {
-        List<WriteStatus> writeStatuses = result.collectAsList();
-        assertTrue(writeStatuses.stream()
-            .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)).count() > 0);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      writeClient.insert(recordsRDD, newCommitTime).collect();
+
+      // trigger 2 updates following with compaction
+      for (int i = 1; i < 5; i += 2) {
+        // Update all the 100 records
+        newCommitTime = "10" + i;
+        updateRecords(config, newCommitTime, records);
+
+        assertLogFilesNumEqualsTo(config, 1);
+
+        HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));
+
+        verifyCompaction(result);
       }
     }
   }
@@ -212,4 +218,52 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
   }
+
+  private void updateRecords(HoodieWriteConfig config, String newCommitTime, List<HoodieRecord> records) throws IOException {
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
+    JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
+    HoodieIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
+    JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
+
+    writeClient.startCommitWithTime(newCommitTime);
+    writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
+    metaClient.reloadActiveTimeline();
+  }
+
+  /**
+   * Verify that all data file has {@code expected} number of log files.
+   *
+   * @param config   The writer config
+   * @param expected The expected number of log files
+   */
+  private void assertLogFilesNumEqualsTo(HoodieWriteConfig config, int expected) {
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    for (String partitionPath : dataGen.getPartitionPaths()) {
+      List<FileSlice> groupedLogFiles =
+          table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+      for (FileSlice fileSlice : groupedLogFiles) {
+        assertEquals(expected, fileSlice.getLogFiles().count(), "There should be " + expected + " log file written for every data file");
+      }
+    }
+  }
+
+  /**
+   * Do a compaction.
+   */
+  private HoodieData<WriteStatus> compact(SparkRDDWriteClient writeClient, String compactionInstantTime) {
+    writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime).getWriteStatuses();
+    return HoodieListData.eager(writeStatusJavaRDD.collect());
+  }
+
+  /**
+   * Verify that all partition paths are present in the WriteStatus result.
+   */
+  private void verifyCompaction(HoodieData<WriteStatus> result) {
+    for (String partitionPath : dataGen.getPartitionPaths()) {
+      List<WriteStatus> writeStatuses = result.collectAsList();
+      assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e5ce343eb39..1a256956bfc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,7 +47,6 @@ import org.apache.log4j.Logger;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -215,7 +214,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
   }
 
   public Map<String, HoodieRecord> getRecords() {
-    return Collections.unmodifiableMap(records);
+    return records;
   }
 
   public HoodieRecordType getRecordType() {