You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/20 03:54:32 UTC

[GitHub] [hudi] stayrascal commented on a change in pull request #4141: [HUDI-2815] Support partial update for streaming change logs

stayrascal commented on a change in pull request #4141:
URL: https://github.com/apache/hudi/pull/4141#discussion_r788331396



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateWithLatestAvroPayload.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+
+/**
+ * The only difference with {@link DefaultHoodieRecordPayload} is that support update partial fields
+ * in latest record which value is not null to existing record instead of all fields.
+ *
+ * <p> Assuming a {@link GenericRecord} has three fields: a int , b int, c int. The first record value: 1, 2, 3.
+ * The second record value is: 4, 5, null, the field c value is null. After call the combineAndGetUpdateValue method,
+ * we will get final record value: 4, 5, 3, field c value will not be overwritten because its value is null in latest record.
+ */
+public class PartialUpdateWithLatestAvroPayload extends DefaultHoodieRecordPayload {
+

Review comment:
       Hi @danny0405 , if I didn't understand wrongly, the `preCombine` method will be called during deduplicate record by `flushBucket` or `flushRemaining` in `SteamWriteFucntion`, which will only deduplicate the records(new created/updated record) in the buffer. 
   
   If we only overwrite `preCombine`, which will only update/merge the records in the buffer, but if there is a record with same recordKey existed in base/log file, the record will be overwrote by the  new merged record from buffer, right?
   
   For example, in COW mode, we might still need to overwrite `combineAndGetUpdateValue` method, because it will be called by `HoodieMergeHandle.write(GenericRecord oldRecord)`, this method will merge the new merged record with old records.
   ```
   public void write(GenericRecord oldRecord) {
       String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
       boolean copyOldRecord = true;
       if (keyToNewRecords.containsKey(key)) {
         // If we have duplicate records that we are updating, then the hoodie record will be deflated after
         // writing the first record. So make a copy of the record to be merged
         HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
         try {
           Option<IndexedRecord> combinedAvroRecord =
               hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
                 useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
                   config.getPayloadConfig().getProps());
   
           if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
             // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
             copyOldRecord = true;
           } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
             /*
              * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
              * write the the combined new
              * value
              *
              * We no longer need to copy the old record over.
              */
             copyOldRecord = false;
           }
           writtenRecordKeys.add(key);
         } catch (Exception e) {
           throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {"
               + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
         }
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org