You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/29 06:24:57 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #5633: [HUDI-4123] Fix the exception due to SqlSource return null checkpoint

xushiyan commented on code in PR #5633:
URL: https://github.com/apache/hudi/pull/5633#discussion_r884217847


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java:
##########
@@ -1954,11 +1954,13 @@ public void testSqlSourceSource() throws Exception {
     String tableBasePath = dfsBasePath + "/test_sql_source_table" + testNum++;
     HoodieDeltaStreamer deltaStreamer =
         new HoodieDeltaStreamer(TestHelpers.makeConfig(
-            tableBasePath, WriteOperationType.INSERT, SqlSource.class.getName(),
+            tableBasePath, WriteOperationType.BULK_INSERT, SqlSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false,
-            false, 1000, false, null, null, "timestamp", null, true), jsc);
+            false, 1000, false, null, null, "timestamp", "earliest", true), jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS * 2, tableBasePath, sqlContext);

Review Comment:
   not getting this change either.. why twice the record number?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -605,15 +605,13 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
     long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
     long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
     boolean hasErrors = totalErrorRecords > 0;
-    long hiveSyncTimeMs = 0;
-    long metaSyncTimeMs = 0;
     if (!hasErrors || cfg.commitOnErrors) {
       HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
       if (checkpointStr != null) {
         checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
-      }
-      if (cfg.checkpoint != null) {
-        checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
+        if (cfg.checkpoint != null) {
+          checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
+        }

Review Comment:
   can you explain the logic behind this change? also pls update PR description to clarify the problem and the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org