You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/01/11 22:41:13 UTC

[GitHub] [incubator-hudi] nsivabalan commented on a change in pull request #1187: [HUDI-499] Allow update partition path with GLOBAL_BLOOM

nsivabalan commented on a change in pull request #1187: [HUDI-499] Allow update partition path with GLOBAL_BLOOM
URL: https://github.com/apache/incubator-hudi/pull/1187#discussion_r365545798
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
 ##########
 @@ -114,14 +117,23 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) {
         keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));
 
     // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
-    return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> {
+    return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record -> {
       final HoodieRecord<T> hoodieRecord = record._1;
       final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
       if (recordLocationHoodieKeyPair.isPresent()) {
         // Record key matched to file
-        return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
+        if (config.getBloomIndexShouldUpdatePartitionPath()) {
+          HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
 
 Review comment:
   I guess we need to have a if else case here. Let me try to explain my understanding. correct me if its wrong. 
   The scenario we are trying to tackle is, insertion for a record happened in PartitionPath1, while update is sent to PartitionPath2. With the newly added cofig value set to true, we want the update operation to insert in PartitionPath2 and also delete in PartitionPath1. 
   
   So, in this case, after the left outer join, here are the values for the record
   record._1 -> incoming record. recordKey1, PartitionPath2
   record._2 -> Tuple2<HoodieRecordLocation, HoodieKey> after index look up. should refer to recordKey1, PartitionPath1. 
   
   `
         if (recordLocationHoodieKeyPair.isPresent()) {
   
           if(recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())){
             return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
           } else {
             if(config value is true) {
               // need to add two records. one delete in old partition path and an insert into new one.
               HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
                   new EmptyHoodieRecordPayload());
               HoodieRecord<T> taggedRecord = getTaggedRecord(hoodieRecord, Option.empty());
               return Arrays.asList(emptyRecord, taggedRecord).iterator();
               return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
             } else{
               return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
             }
           }
   
           // Record key matched to file
          // return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
         } else {
           return getTaggedRecord(hoodieRecord, Option.empty());
         }
   `

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services