You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/28 21:12:36 UTC

[hudi] branch master updated: [HUDI-4734] Deltastreamer table config change validation (#6753)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e4ab4f16e [HUDI-4734] Deltastreamer table config change validation (#6753)
5e4ab4f16e is described below

commit 5e4ab4f16e3770531953495af663ea0e01c0e598
Author: Jon Vexler <jo...@onehouse.ai>
AuthorDate: Wed Sep 28 17:12:27 2022 -0400

    [HUDI-4734] Deltastreamer table config change validation (#6753)
    
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../deltastreamer/HoodieDeltaStreamer.java         |  5 ++
 .../functional/TestHoodieDeltaStreamer.java        | 58 ++++++++++++++++------
 2 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 867aa05b30..74cb3e31df 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieWriterUtils;
 import org.apache.hudi.async.AsyncClusteringService;
 import org.apache.hudi.async.AsyncCompactService;
 import org.apache.hudi.async.HoodieAsyncService;
@@ -76,6 +77,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -651,6 +653,9 @@ public class HoodieDeltaStreamer implements Serializable {
                 + cfg.baseFileFormat);
         cfg.baseFileFormat = baseFileFormat;
         this.cfg.baseFileFormat = baseFileFormat;
+        Map<String,String> propsToValidate = new HashMap<>();
+        properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString()));
+        HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
       } else {
         tableType = HoodieTableType.valueOf(cfg.tableType);
         if (cfg.baseFileFormat == null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 035ad9b129..f05a36745b 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -612,25 +612,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
 
     // No new data => no commits.
     cfg.sourceLimit = 0;
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
 
     // upsert() #1
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
     List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
 
@@ -664,6 +655,43 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     assertTrue(fieldNames.containsAll(expectedFieldNames));
   }
 
+  @Test
+  public void testModifiedTableConfigs() throws Exception {
+    String tableBasePath = dfsBasePath + "/test_table_modified_configs";
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+    // No new data => no commits.
+    cfg.sourceLimit = 0;
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+    // add disallowed config update to recordkey field. An exception should be thrown
+    cfg.sourceLimit = 2000;
+    cfg.operation = WriteOperationType.UPSERT;
+    cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
+    assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
+    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
+
+
+    //perform the upsert and now with the original config, the commit should go through
+    HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    newCfg.sourceLimit = 2000;
+    newCfg.operation = WriteOperationType.UPSERT;
+    syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
+    List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum());
+  }
+
+  private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
+    TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, totalCommits);
+  }
+
   @ParameterizedTest
   @MethodSource("schemaEvolArgs")
   public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
@@ -1419,7 +1447,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPayloadClassUpdate() throws Exception {
-    String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
+    String dataSetBasePath = dfsBasePath + "/test_dataset_mor_payload_class_update";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, "MERGE_ON_READ");
@@ -1592,6 +1620,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
       populateCommonProps(parquetProps, dfsBasePath);
     }
 
+    parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
+
     parquetProps.setProperty("include", "base.properties");
     parquetProps.setProperty("hoodie.embed.timeline.server", "false");
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -2142,7 +2172,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
     TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
   }
-  
+
   void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
     // Initial insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);