You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:57 UTC

[hudi] 16/17: [HUDI-4734] Deltastreamer table config change validation (#6753)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9a3fcc8456796add306f6d3d2756afadf830f41a
Author: Jon Vexler <jo...@onehouse.ai>
AuthorDate: Wed Sep 28 17:12:27 2022 -0400

    [HUDI-4734] Deltastreamer table config change validation (#6753)
    
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../deltastreamer/HoodieDeltaStreamer.java         |  5 ++
 .../functional/TestHoodieDeltaStreamer.java        | 58 ++++++++++++++++------
 2 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 867aa05b30..74cb3e31df 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieWriterUtils;
 import org.apache.hudi.async.AsyncClusteringService;
 import org.apache.hudi.async.AsyncCompactService;
 import org.apache.hudi.async.HoodieAsyncService;
@@ -76,6 +77,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -651,6 +653,9 @@ public class HoodieDeltaStreamer implements Serializable {
                 + cfg.baseFileFormat);
         cfg.baseFileFormat = baseFileFormat;
         this.cfg.baseFileFormat = baseFileFormat;
+        Map<String,String> propsToValidate = new HashMap<>();
+        properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString()));
+        HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
       } else {
         tableType = HoodieTableType.valueOf(cfg.tableType);
         if (cfg.baseFileFormat == null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index d94ff1477a..12c4c6fe0e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -611,25 +611,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
 
     // No new data => no commits.
     cfg.sourceLimit = 0;
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
 
     // upsert() #1
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
     List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
 
@@ -663,6 +654,43 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     assertTrue(fieldNames.containsAll(expectedFieldNames));
   }
 
+  @Test
+  public void testModifiedTableConfigs() throws Exception {
+    String tableBasePath = dfsBasePath + "/test_table_modified_configs";
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+    // No new data => no commits.
+    cfg.sourceLimit = 0;
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+    // add disallowed config update to recordkey field. An exception should be thrown
+    cfg.sourceLimit = 2000;
+    cfg.operation = WriteOperationType.UPSERT;
+    cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
+    assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
+    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
+
+
+    //perform the upsert and now with the original config, the commit should go through
+    HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    newCfg.sourceLimit = 2000;
+    newCfg.operation = WriteOperationType.UPSERT;
+    syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
+    List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum());
+  }
+
+  private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
+    TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, totalCommits);
+  }
+
   @ParameterizedTest
   @MethodSource("schemaEvolArgs")
   public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
@@ -1418,7 +1446,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPayloadClassUpdate() throws Exception {
-    String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
+    String dataSetBasePath = dfsBasePath + "/test_dataset_mor_payload_class_update";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, "MERGE_ON_READ");
@@ -1572,6 +1600,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
       populateCommonProps(parquetProps, dfsBasePath);
     }
 
+    parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
+
     parquetProps.setProperty("include", "base.properties");
     parquetProps.setProperty("hoodie.embed.timeline.server", "false");
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -2122,7 +2152,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
     TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
   }
-  
+
   void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
     // Initial insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);