You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/15 02:11:26 UTC

[GitHub] [hudi] stayrascal commented on a diff in pull request #4724: [HUDI-2815] add partial overwrite payload to support partial overwrit…

stayrascal commented on code in PR #4724:
URL: https://github.com/apache/hudi/pull/4724#discussion_r851000714


##########
hudi-common/src/main/java/org/apache/hudi/common/model/PartialOverwriteWithLatestAvroPayload.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.BiFunction;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+
+/**
+ * The only difference with {@link OverwriteNonDefaultsWithLatestAvroPayload} is that it supports
+ * merging the latest non-null partial fields with the old record instead of replacing the whole record.
+ * And merging the non-null fields during preCombine multiple records with same record key instead of choosing the latest record based on ordering field.
+ *
+ * <p> Regarding #combineAndGetUpdateValue, Assuming a {@link GenericRecord} has row schema: (f0 int , f1 int, f2 int).
+ * The first record value is: (1, 2, 3), the second record value is: (4, 5, null) with the field f2 value as null.
+ * Calling the #combineAndGetUpdateValue method of the two records returns record: (4, 5, 3).
+ * Note that field f2 value is ignored because it is null. </p>
+ *
+ * <p> Regarding #preCombine, Assuming a {@link GenericRecord} has row schema: (f0 int , f1 int, f2 int, o1 int),
+ * and initial two {@link PartialOverwriteWithLatestAvroPayload} with different ordering value.
+ * The first record value is (1, null, 1, 1) with the filed f1 value as null, the second value is: (2, 2, null, 2) with the f2 value as null.
+ * Calling the #preCombine method of the two records returns record: (2, 2, 1, 2).
+ * Note:
+ * <ol>
+ *   <li>the field f0 value is 2 because the ordering value of second record is bigger.</li>
+ *   <li>the filed f1 value is 2 because the f2 value of first record is null.</li>
+ *   <li>the filed f2 value is 1 because the f2 value of second record is null.</li>
+ *   <li>the filed o1 value is 2 because the ordering value of second record is bigger.</li>
+ * </ol>
+ *
+ * </p>
+ */
+public class PartialOverwriteWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {
+
+  public PartialOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public PartialOverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
+    super(record); // natural order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+
+    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+    GenericRecord currentRecord = (GenericRecord) currentValue;
+    if (isDeleteRecord(incomingRecord)) {
+      return Option.empty();
+    }
+    return Option.of(overwriteWithNonNullValue(schema, currentRecord, incomingRecord));
+  }
+
+  @Override
+  public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
+    return this.orderingVal.compareTo(oldValue.orderingVal);
+  }
+
+  @Override
+  public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties, Schema schema) {
+    if (null == schema) {
+      // using default preCombine logic
+      return super.preCombine(oldValue);
+    }
+
+    try {
+      Option<IndexedRecord> incomingOption = this.getInsertValue(schema);
+      Option<IndexedRecord> oldRecordOption = oldValue.getInsertValue(schema);
+
+      if (incomingOption.isPresent() && oldRecordOption.isPresent()) {
+        boolean inComingRecordIsLatest = this.compareTo(oldValue) >= 0;
+        // ordering two records by ordering value
+        GenericRecord firstRecord = (GenericRecord) (inComingRecordIsLatest ? oldRecordOption.get() : incomingOption.get());
+        GenericRecord secondRecord = (GenericRecord) (inComingRecordIsLatest ? incomingOption.get() : oldRecordOption.get());
+        GenericRecord mergedRecord = overwriteWithNonNullValue(schema, firstRecord, secondRecord);
+        return new PartialOverwriteWithLatestAvroPayload(mergedRecord, inComingRecordIsLatest ? this.orderingVal : oldValue.orderingVal);
+      } else {
+        return super.preCombine(oldValue);
+      }
+    } catch (IOException e) {
+      return super.preCombine(oldValue);
+    }
+  }
+
+  private GenericRecord mergeRecord(Schema schema, GenericRecord first, GenericRecord second, BiFunction<Object, Object, Object> mergeFunc) {
+    schema.getFields().forEach(field -> {
+      Object firstValue = first.get(field.name());
+      Object secondValue = second.get(field.name());
+      first.put(field.name(), mergeFunc.apply(firstValue, secondValue));
+    });
+    return first;
+  }
+
+  /**
+   * Merge two records, the merged value of each filed will adopt the filed value from secondRecord if the value is not null, otherwise, adopt the filed value from firstRecord.
+   *
+   * @param schema record schema to loop fields
+   * @param firstRecord the base record need to be updated
+   * @param secondRecord the new record provide new field value
+   * @return merged records
+   */
+  private GenericRecord overwriteWithNonNullValue(Schema schema, GenericRecord firstRecord, GenericRecord secondRecord) {

Review Comment:
   Hi @alvarolemos , thanks a lot for your useful suggestion. Yeah, I also considered to abstract the merge logic by using an abstract `merge` method or passing `merge` function into a generic function, and I choose the later. The reason as follow:
   - the `preCombine` and `combineAndGetUpdateUpdate` might have different merge/combine logic, only implement one abstract `merge` function might not enough for both two cases. For example, these two methods in `OverwriteWithLatestAvroPayload` have different merge/combine logic.
   - In current implementation, actually, the `mergeRecord` is a generic method even through it's a private method currently, but it don't care the detail merge logic and can be changed to protected/public scope if need. Instead, the `overwriteWithNonNullValue` is merge implementation in current "Payload", which is wrapper of `mergeFunc` and we can create two wrappers for `preCombine` and `combineAndGetUpdateValue` two scenarios if need, which is similar with what you mentioned about implement detail `mergeFunc` logic in sub class. We can still inherit this class implement detail `mergeFunc` logic, and pass to `mergeRecord` method.
   - Another reason why i didn't chose creating abstract class currently is that there will only one sub class, we can refactor it if we have many case need to inherit this class, right now, just make it simple as much as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org