You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/17 12:01:58 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4838: Fix HoodieIncr source checkpoint not progressing

nsivabalan commented on a change in pull request #4838:
URL: https://github.com/apache/hudi/pull/4838#discussion_r808974095



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
##########
@@ -106,8 +110,8 @@ public S3EventsHoodieIncrSource(
             sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy);
 
     if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) {
-      LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey());
-      return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey());
+      LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getValue().getKey());

Review comment:
       probably we can make this a separate PR and add tests. I can take it up if need be. Can you remove that from this PR.  

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
##########
@@ -174,7 +178,27 @@ public S3EventsHoodieIncrSource(
     }
     Option<Dataset<Row>> dataset = Option.empty();
     if (!cloudFiles.isEmpty()) {
+      Log.info("Number of files to be processed=" + cloudFiles.size());
       dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0])));
+      if (props.containsKey(HoodieIncrSource.Config.DROP_COLUMNS)) {
+        String dropColumns = props.getString(HoodieIncrSource.Config.DROP_COLUMNS);
+        dataset = dataset.map(ds -> ds.drop(dropColumns.split(",")));
+      }
+      if (props.containsKey(HoodieIncrSource.Config.CAST_TO_STRING)) {

Review comment:
       in general this should go in as a general feature support to any detlastreamer sources. Lets think about what it takes to add such feature. If its not feasible, we can add it just incr source. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org