You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/23 02:35:59 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6753: [HUDI-4734] Deltastreamer table config change validation

nsivabalan commented on code in PR #6753:
URL: https://github.com/apache/hudi/pull/6753#discussion_r978224183


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -435,92 +440,92 @@ public boolean equals(Object o) {
       }
       Config config = (Config) o;
       return sourceLimit == config.sourceLimit
-              && Objects.equals(targetBasePath, config.targetBasePath)
-              && Objects.equals(targetTableName, config.targetTableName)
-              && Objects.equals(tableType, config.tableType)
-              && Objects.equals(baseFileFormat, config.baseFileFormat)
-              && Objects.equals(propsFilePath, config.propsFilePath)
-              && Objects.equals(configs, config.configs)
-              && Objects.equals(sourceClassName, config.sourceClassName)
-              && Objects.equals(sourceOrderingField, config.sourceOrderingField)
-              && Objects.equals(payloadClassName, config.payloadClassName)
-              && Objects.equals(schemaProviderClassName, config.schemaProviderClassName)
-              && Objects.equals(transformerClassNames, config.transformerClassNames)
-              && operation == config.operation
-              && Objects.equals(filterDupes, config.filterDupes)
-              && Objects.equals(enableHiveSync, config.enableHiveSync)
-              && Objects.equals(enableMetaSync, config.enableMetaSync)
-              && Objects.equals(forceEmptyMetaSync, config.forceEmptyMetaSync)
-              && Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
-              && Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
-              && Objects.equals(maxPendingClustering, config.maxPendingClustering)
-              && Objects.equals(continuousMode, config.continuousMode)
-              && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds)
-              && Objects.equals(sparkMaster, config.sparkMaster)
-              && Objects.equals(commitOnErrors, config.commitOnErrors)
-              && Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight)
-              && Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight)
-              && Objects.equals(clusterSchedulingWeight, config.clusterSchedulingWeight)
-              && Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare)
-              && Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare)
-              && Objects.equals(clusterSchedulingMinShare, config.clusterSchedulingMinShare)
-              && Objects.equals(forceDisableCompaction, config.forceDisableCompaction)
-              && Objects.equals(checkpoint, config.checkpoint)
-              && Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider)
-              && Objects.equals(help, config.help);
+          && Objects.equals(targetBasePath, config.targetBasePath)

Review Comment:
   can you revert unintended changes.



##########
packaging/hudi-hive-sync-bundle/pom.xml:
##########
@@ -72,6 +72,7 @@
                   <include>org.apache.hudi:hudi-hive-sync</include>
 
                   <include>com.beust:jcommander</include>
+                  <include>org.apache.avro:avro</include>

Review Comment:
   lets revert this in this PR. we have a separate PR tackling this



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java:
##########
@@ -1447,8 +1492,8 @@ public void testPayloadClassUpdate() throws Exception {
   public void testPartialPayloadClass() throws Exception {
     String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
-          Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
-          true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
+        Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,

Review Comment:
   again, please do revert unintended changes.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -651,6 +656,9 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config
                 + cfg.baseFileFormat);
         cfg.baseFileFormat = baseFileFormat;
         this.cfg.baseFileFormat = baseFileFormat;
+        HashMap<String,String> scalaprops = new HashMap<>();

Review Comment:
   we can probably name this "props" or "properties". its actually scala map only in L661. 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java:
##########
@@ -664,6 +664,51 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
     assertTrue(fieldNames.containsAll(expectedFieldNames));
   }
 
+  @Test
+  public void testModifiedTableConfigs() throws Exception {
+    String tableBasePath = dfsBasePath + "/test_table";
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+
+    // No new data => no commits.
+    cfg.sourceLimit = 0;
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+
+    // add disallowed config update to recordkey field. An exception should be thrown
+    cfg.sourceLimit = 2000;
+    cfg.operation = WriteOperationType.UPSERT;
+    cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
+    assertThrows(HoodieException.class, () -> new HoodieDeltaStreamer(cfg, jsc).sync());
+
+    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);

Review Comment:
   we can probably write the test differently and reduce the no of lines. lets sync up sometime and go over this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org