You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/06/24 20:16:57 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #9041: [HUDI-6431] Support update partition path in record-level index

nsivabalan commented on code in PR #9041:
URL: https://github.com/apache/hudi/pull/9041#discussion_r1240924843


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record));
+
+    // Pair of incoming record and the global location if meant for merged lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's partition
+              //   when partition is not updated and the look-up won't have duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(
+                      createNewHoodieRecord(incomingRecord, currentLoc, merger), Option.of(currentLoc)),
+                  Option.empty());
+            }
+          } else {
+            return Pair.of(getTaggedRecord(incomingRecord, Option.empty()), Option.empty());
+          }
+        });
+    return shouldUpdatePartitionPath
+        ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, config, table)
+        : incomingRecordsAndLocations.map(Pair::getLeft);
+  }
+
+  public static HoodieRecord createNewHoodieRecord(HoodieRecord oldRecord, HoodieRecordGlobalLocation location, HoodieRecordMerger merger) {
+    HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(), location.getPartitionPath());
+    return merger.getRecordType() == HoodieRecordType.AVRO

Review Comment:
   shouldn't we set the location in the hoodieRecord before returning ? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record));
+
+    // Pair of incoming record and the global location if meant for merged lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's partition
+              //   when partition is not updated and the look-up won't have duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(
+                      createNewHoodieRecord(incomingRecord, currentLoc, merger), Option.of(currentLoc)),
+                  Option.empty());
+            }
+          } else {
+            return Pair.of(getTaggedRecord(incomingRecord, Option.empty()), Option.empty());

Review Comment:
   may be, we don't even need to call getTaggedRecord. just Pair.of(incomingRecord, Option.empty() would do. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record));
+
+    // Pair of incoming record and the global location if meant for merged lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's partition
+              //   when partition is not updated and the look-up won't have duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(
+                      createNewHoodieRecord(incomingRecord, currentLoc, merger), Option.of(currentLoc)),

Review Comment:
   minor. last arg -> currentLocOpt



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record));
+
+    // Pair of incoming record and the global location if meant for merged lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's partition
+              //   when partition is not updated and the look-up won't have duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(

Review Comment:
   should this be 
   ```
   createNewHoodieRecord(incomingRecord, currentLoc, merger), Option.empty())
   ```
   
   i.e. last arg as Option.empty. so that within mergeForPartitionUpdatesIfNeeded, we don't need to load those partition keys. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record));
+
+    // Pair of incoming record and the global location if meant for merged lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath());

Review Comment:
   would also work for this scenario. 
   a record moved from P1 -> p2 and then now we getting a new batch where it moves again to p1? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
             return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator();
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), record));
+
+    // Pair of incoming record and the global location if meant for merged lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's partition
+              //   when partition is not updated and the look-up won't have duplicates (e.g. COW, or using RLI).
+              return Pair.of((HoodieRecord<R>) getTaggedRecord(
+                      createNewHoodieRecord(incomingRecord, currentLoc, merger), Option.of(currentLoc)),
+                  Option.empty());
+            }
+          } else {
+            return Pair.of(getTaggedRecord(incomingRecord, Option.empty()), Option.empty());
+          }
+        });
+    return shouldUpdatePartitionPath
+        ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, config, table)

Review Comment:
   looks like we call mergeForPartitionUpdatesIfNeeded just based on shouldUpdatePartitionPath. 
   in case of RLI, even if shouldUpdatePartitionPath is set to true, and if incoming record's location is not differing from existing location, we should avoid snapshot load right. 
   
   can you help me understand if we are good on this case. 
   within mergeForPartitionUpdatesIfNeeded, I see that we call getExistingRecords() irrespective of these. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java:
##########
@@ -72,85 +68,37 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(
   protected <R> HoodieData<HoodieRecord<R>> tagLocationInternal(
       HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
       HoodieTable hoodieTable) {
-
-    HoodiePairData<String, HoodieRecord<R>> keyedInputRecords =
-        inputRecords.mapToPair(entry -> new ImmutablePair<>(entry.getRecordKey(), entry));
-    HoodiePairData<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable =
-        fetchAllRecordLocations(context, hoodieTable, config.getGlobalSimpleIndexParallelism());
-    return getTaggedRecords(keyedInputRecords, allRecordLocationsInTable, hoodieTable);
+    List<Pair<String, HoodieBaseFile>> latestBaseFiles = getAllBaseFilesInTable(context, hoodieTable);

Review Comment:
   we could move L 71 within fetchRecordGlobalLocations() right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org