You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/24 12:51:14 UTC

[GitHub] [hudi] bvaradar commented on a change in pull request #2388: [HUDI-1353] add incremental timeline support for pending clustering ops

bvaradar commented on a change in pull request #2388:
URL: https://github.com/apache/hudi/pull/2388#discussion_r581902402



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
##########
@@ -57,35 +57,38 @@ public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline
       List<HoodieInstant> newInstants = new ArrayList<>();
 
       // Check If any pending compaction is lost. If so, do not allow incremental timeline sync
-      List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = getPendingCompactionTransitions(oldT, newT);
-      List<HoodieInstant> lostPendingCompactions = compactionInstants.stream()
+      List<Pair<HoodieInstant, HoodieInstant>> viewChangingInstants = getPendingActionTransitions(oldT, newT);
+      List<HoodieInstant> lostPendingActions = viewChangingInstants.stream()
           .filter(instantPair -> instantPair.getValue() == null).map(Pair::getKey).collect(Collectors.toList());
-      if (!lostPendingCompactions.isEmpty()) {
-        // If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been
+      if (!lostPendingActions.isEmpty()) {
+        // If a compaction/clustering is unscheduled, fall back to complete refresh of fs view since some log files could have been
         // moved. Its unsafe to incrementally sync in that case.
-        LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :"
-            + lostPendingCompactions);
+        LOG.warn("Some pending view changing instants are no longer in new timeline (unscheduled ?). They are :"
+            + lostPendingActions);
         return TimelineDiffResult.UNSAFE_SYNC_RESULT;
       }
-      List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream()
-          .filter(instantPair -> instantPair.getValue().getAction().equals(HoodieTimeline.COMMIT_ACTION)
-              && instantPair.getValue().isCompleted())
+      List<HoodieInstant> finishedViewChangingInstants = viewChangingInstants.stream()

Review comment:
       Can you construct a timeline and call timeline.viewAlteringInstants() instead to avoid duplicating the logic ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
##########
@@ -57,35 +57,38 @@ public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline
       List<HoodieInstant> newInstants = new ArrayList<>();
 
       // Check If any pending compaction is lost. If so, do not allow incremental timeline sync
-      List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = getPendingCompactionTransitions(oldT, newT);
-      List<HoodieInstant> lostPendingCompactions = compactionInstants.stream()
+      List<Pair<HoodieInstant, HoodieInstant>> viewChangingInstants = getPendingActionTransitions(oldT, newT);
+      List<HoodieInstant> lostPendingActions = viewChangingInstants.stream()
           .filter(instantPair -> instantPair.getValue() == null).map(Pair::getKey).collect(Collectors.toList());
-      if (!lostPendingCompactions.isEmpty()) {
-        // If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been
+      if (!lostPendingActions.isEmpty()) {
+        // If a compaction/clustering is unscheduled, fall back to complete refresh of fs view since some log files could have been
         // moved. Its unsafe to incrementally sync in that case.
-        LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :"
-            + lostPendingCompactions);
+        LOG.warn("Some pending view changing instants are no longer in new timeline (unscheduled ?). They are :"
+            + lostPendingActions);
         return TimelineDiffResult.UNSAFE_SYNC_RESULT;
       }
-      List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream()
-          .filter(instantPair -> instantPair.getValue().getAction().equals(HoodieTimeline.COMMIT_ACTION)
-              && instantPair.getValue().isCompleted())
+      List<HoodieInstant> finishedViewChangingInstants = viewChangingInstants.stream()
+          .filter(instantPair -> instantPair.getValue().isCompleted()
+              && (HoodieTimeline.COMMIT_ACTION.equals(instantPair.getValue().getAction())
+              || HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instantPair.getValue().getAction())))
           .map(Pair::getKey).collect(Collectors.toList());
 
       newT.getInstants().filter(instant -> !oldTimelineInstants.contains(instant)).forEach(newInstants::add);
-      return new TimelineDiffResult(newInstants, finishedCompactionInstants, true);
+      return new TimelineDiffResult(newInstants, finishedViewChangingInstants, true);
     } else {
       // One or more timelines is empty
       LOG.warn("One or more timelines is empty");
       return TimelineDiffResult.UNSAFE_SYNC_RESULT;
     }
   }
 
-  private static List<Pair<HoodieInstant, HoodieInstant>> getPendingCompactionTransitions(HoodieTimeline oldTimeline,
-      HoodieTimeline newTimeline) {
+  private static List<Pair<HoodieInstant, HoodieInstant>> getPendingActionTransitions(HoodieTimeline oldTimeline,
+                                                                                      HoodieTimeline newTimeline) {
     Set<HoodieInstant> newTimelineInstants = newTimeline.getInstants().collect(Collectors.toSet());
+    
+    List<Pair<HoodieInstant, HoodieInstant>> allTransitions = new ArrayList<>();
 
-    return oldTimeline.filterPendingCompactionTimeline().getInstants().map(instant -> {
+    allTransitions.addAll(oldTimeline.filterPendingCompactionTimeline().getInstants().map(instant -> {

Review comment:
       Instead of oldTimeline.filterPendingCompactionTimeline().getInstants(),
   is there scope to use 
   oldTimeline.viewChangingInstants() and consolidate both this and below statement  where we handle replace commits ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
##########
@@ -223,7 +223,7 @@ void resetFileGroupsInPendingClustering(Map<HoodieFileGroupId, HoodieInstant> fg
   @Override
   void addFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups) {
     fileGroups.forEach(fileGroupInstantPair -> {
-      ValidationUtils.checkArgument(fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),
+      ValidationUtils.checkArgument(!fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),

Review comment:
       This is a bug in 0.7 right which will fail when RocksDBFileSystemView is used ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
##########
@@ -108,17 +110,22 @@ private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diff
     LOG.info("Timeline Diff Result is :" + diffResult);
 
     // First remove pending compaction instants which were completed
-    diffResult.getFinishedCompactionInstants().stream().forEach(instant -> {
+    diffResult.getFinishedViewChangingInstants().stream().forEach(instant -> {
       try {
-        removePendingCompactionInstant(timeline, instant);
+        if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {

Review comment:
       Can we introduce something like HoodieInstant.isAction(String action) instead of directly checking the action names here ? There could be many such occurrence like this ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
##########
@@ -161,7 +161,7 @@ public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaC
 
     // Adding mandatory parameters - Last instants affecting file-slice
     timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
-    builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
+    builder.addParameter(TIMELINE_HASH, timeline.filterCompletedAndCompactionInstants().getTimelineHash());

Review comment:
       Shouldn't this be filterViewChangingInstants ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
##########
@@ -77,7 +77,7 @@ public HoodieFileGroup(HoodieFileGroupId fileGroupId, HoodieTimeline timeline) {
     this.fileGroupId = fileGroupId;
     this.fileSlices = new TreeMap<>(HoodieFileGroup.getReverseCommitTimeComparator());
     this.timeline = timeline;
-    this.lastInstant = timeline.lastInstant();
+    this.lastInstant = timeline.filterCompletedAndCompactionInstants().lastInstant();

Review comment:
       We should avoid this. FileGroup should just be acting on the timeline given to make them composable.  Can you elaborate on why is there a need to ensure lastInstant must include only completed instants ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org