You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/09/28 21:12:36 UTC
[hudi] branch master updated: [HUDI-4734] Deltastreamer table config change validation (#6753)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e4ab4f16e [HUDI-4734] Deltastreamer table config change validation (#6753)
5e4ab4f16e is described below
commit 5e4ab4f16e3770531953495af663ea0e01c0e598
Author: Jon Vexler <jo...@onehouse.ai>
AuthorDate: Wed Sep 28 17:12:27 2022 -0400
[HUDI-4734] Deltastreamer table config change validation (#6753)
Co-authored-by: sivabalan <n....@gmail.com>
---
.../deltastreamer/HoodieDeltaStreamer.java | 5 ++
.../functional/TestHoodieDeltaStreamer.java | 58 ++++++++++++++++------
2 files changed, 49 insertions(+), 14 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 867aa05b30..74cb3e31df 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieWriterUtils;
import org.apache.hudi.async.AsyncClusteringService;
import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.async.HoodieAsyncService;
@@ -76,6 +77,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -651,6 +653,9 @@ public class HoodieDeltaStreamer implements Serializable {
+ cfg.baseFileFormat);
cfg.baseFileFormat = baseFileFormat;
this.cfg.baseFileFormat = baseFileFormat;
+ Map<String,String> propsToValidate = new HashMap<>();
+ properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString()));
+ HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
} else {
tableType = HoodieTableType.valueOf(cfg.tableType);
if (cfg.baseFileFormat == null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 035ad9b129..f05a36745b 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -612,25 +612,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
- new HoodieDeltaStreamer(cfg, jsc).sync();
- TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
// No new data => no commits.
cfg.sourceLimit = 0;
- new HoodieDeltaStreamer(cfg, jsc).sync();
- TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+ syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
// upsert() #1
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
- new HoodieDeltaStreamer(cfg, jsc).sync();
- TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
- TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+ syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
@@ -664,6 +655,43 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
assertTrue(fieldNames.containsAll(expectedFieldNames));
}
+ @Test
+ public void testModifiedTableConfigs() throws Exception {
+ String tableBasePath = dfsBasePath + "/test_table_modified_configs";
+
+ // Initial bulk insert
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+ // No new data => no commits.
+ cfg.sourceLimit = 0;
+ syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+ // add disallowed config update to recordkey field. An exception should be thrown
+ cfg.sourceLimit = 2000;
+ cfg.operation = WriteOperationType.UPSERT;
+ cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
+ assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
+ List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
+
+
+ //perform the upsert and now with the original config, the commit should go through
+ HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ newCfg.sourceLimit = 2000;
+ newCfg.operation = WriteOperationType.UPSERT;
+ syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
+ List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+ assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum());
+ }
+
+ private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+ TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
+ TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, totalCommits);
+ }
+
@ParameterizedTest
@MethodSource("schemaEvolArgs")
public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
@@ -1419,7 +1447,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
@Test
public void testPayloadClassUpdate() throws Exception {
- String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
+ String dataSetBasePath = dfsBasePath + "/test_dataset_mor_payload_class_update";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, "MERGE_ON_READ");
@@ -1592,6 +1620,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
populateCommonProps(parquetProps, dfsBasePath);
}
+ parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
+
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server", "false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -2142,7 +2172,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
// No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
}
-
+
void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
// Initial insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);