You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/09/05 11:33:08 UTC
[hudi] branch master updated: [HUDI-4775] Fixing incremental source for MOR table (#6587)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0f76d69e94 [HUDI-4775] Fixing incremental source for MOR table (#6587)
0f76d69e94 is described below
commit 0f76d69e942335440b91468acdc124a4359268a6
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Mon Sep 5 04:32:59 2022 -0700
[HUDI-4775] Fixing incremental source for MOR table (#6587)
* Fixing incremental source for MOR table
* Remove unused import
Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
.../sources/helpers/IncrSourceHelper.java | 2 +-
.../utilities/sources/TestHoodieIncrSource.java | 39 +++++++++++++++++++---
2 files changed, 35 insertions(+), 6 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index cbfb153ee9..d9415d036c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -73,7 +73,7 @@ public class IncrSourceHelper {
HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTimeline activeCommitTimeline =
- srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
String beginInstantTime = beginInstant.orElseGet(() -> {
if (missingCheckpointStrategy != null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 57270bdf81..df790cf115 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -22,7 +22,9 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -30,23 +32,31 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.stream.Stream;
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -55,20 +65,39 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
private HoodieTestDataGenerator dataGen;
private HoodieTableMetaClient metaClient;
+ private HoodieTableType tableType = COPY_ON_WRITE;
@BeforeEach
public void setUp() throws IOException {
dataGen = new HoodieTestDataGenerator();
- metaClient = getHoodieMetaClient(hadoopConf(), basePath());
}
- @Test
- public void testHoodieIncrSource() throws IOException {
+ @Override
+ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
+ props = HoodieTableMetaClient.withPropertyBuilder()
+ .setTableName(RAW_TRIPS_TEST_NAME)
+ .setTableType(tableType)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .fromProperties(props)
+ .build();
+ return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
+ }
+
+ private static Stream<Arguments> tableTypeParams() {
+ return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("tableTypeParams")
+ public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
+ this.tableType = tableType;
+ metaClient = getHoodieMetaClient(hadoopConf(), basePath());
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
- .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .enable(false).build())
.build();
SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);