You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/05 11:33:08 UTC

[hudi] branch master updated: [HUDI-4775] Fixing incremental source for MOR table (#6587)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f76d69e94 [HUDI-4775] Fixing incremental source for MOR table (#6587)
0f76d69e94 is described below

commit 0f76d69e942335440b91468acdc124a4359268a6
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Mon Sep 5 04:32:59 2022 -0700

    [HUDI-4775] Fixing incremental source for MOR table (#6587)
    
    * Fixing incremental source for MOR table
    
    * Remove unused import
    
    Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
 .../sources/helpers/IncrSourceHelper.java          |  2 +-
 .../utilities/sources/TestHoodieIncrSource.java    | 39 +++++++++++++++++++---
 2 files changed, 35 insertions(+), 6 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index cbfb153ee9..d9415d036c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -73,7 +73,7 @@ public class IncrSourceHelper {
     HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
 
     final HoodieTimeline activeCommitTimeline =
-        srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+        srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
 
     String beginInstantTime = beginInstant.orElseGet(() -> {
       if (missingCheckpointStrategy != null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 57270bdf81..df790cf115 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -22,7 +22,9 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -30,23 +32,31 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 
 import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.stream.Stream;
 
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -55,20 +65,39 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
 
   private HoodieTestDataGenerator dataGen;
   private HoodieTableMetaClient metaClient;
+  private HoodieTableType tableType = COPY_ON_WRITE;
 
   @BeforeEach
   public void setUp() throws IOException {
     dataGen = new HoodieTestDataGenerator();
-    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
   }
 
-  @Test
-  public void testHoodieIncrSource() throws IOException {
+  @Override
+  public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
+    props = HoodieTableMetaClient.withPropertyBuilder()
+        .setTableName(RAW_TRIPS_TEST_NAME)
+        .setTableType(tableType)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .fromProperties(props)
+        .build();
+    return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
+  }
+
+  private static Stream<Arguments> tableTypeParams() {
+    return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("tableTypeParams")
+  public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
+    this.tableType = tableType;
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
     HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
         .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
         .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+            .enable(false).build())
         .build();
 
     SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);