You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/05/31 11:49:02 UTC

[GitHub] [hudi] wangxianghu commented on a change in pull request #2994: [HUDI-1931] BucketAssignFunction use wrong state

wangxianghu commented on a change in pull request #2994:
URL: https://github.com/apache/hudi/pull/2994#discussion_r642415071



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -125,12 +126,18 @@
    */
   private final boolean globalIndex;
 
+  private final double ttlTime;
+
+  private final SimpleDateFormat instantDateFormat;

Review comment:
       how about use `org.apache.hudi.common.table.timeline.HoodieActiveTimeline#COMMIT_FORMATTER` to replace `instantDateFormat ` to avoid import `SimpleDateFormat ` again ?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -112,7 +113,7 @@
   /**
    * State to book-keep which partition is loaded into the index state {@code indexState}.
    */

Review comment:
       please update the doc as well

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -271,34 +278,39 @@ private void loadRecords(String partitionPath) throws Exception {
     final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
     final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
     final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+    final long currentTimeMillis = System.currentTimeMillis();
     for (HoodieBaseFile baseFile : latestBaseFiles) {
+      String commitTime = baseFile.getCommitTime();
+      long commitTimeLong = instantDateFormat.parse(commitTime).getTime();
+      // file was expired, needn't to fetch hoodieKeys
+      if (ttlTime > 0 && (commitTimeLong + ttlTime) < currentTimeMillis) {

Review comment:
       we can not add and compare them directly, they are not the same base
   ```
   commitTimeLong -> yyyyMMddHHmmss
   ttlTime -> number
   currentTimeMillis -> number
   ```

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -189,15 +194,17 @@ public void processElement(I value, Context ctx, Collector<O> out) throws Except
 
     // The dataset may be huge, thus the processing would block for long,
     // disabled by default.
-    if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
+    if (bootstrapIndex && !Boolean.TRUE.equals(partitionLoadState.value())) {

Review comment:
       `Boolean.TRUE.equals(partitionLoadState.value()` => `partitionLoadState.value()` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org