You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/31 17:10:59 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #2056: [HUDI-1255]:Add new Payload(OverwriteMulColAvroPayload) for updating specified fields in storage

vinothchandar commented on a change in pull request #2056:
URL: https://github.com/apache/hudi/pull/2056#discussion_r480265954



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteMulColAvroPayload.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * subclass of OverwriteWithLatestAvroPayload used for delta streamer.
+ * <p>
+ * 1. preCombine - Picks the latest delta record for a key, based on an ordering field.
+ * 2. combineAndGetUpdateValue/getInsertValue - overwrite storage for specified fields
+ * that doesn't equal defaultValue.
+ */
+public class OverwriteMulColAvroPayload extends OverwriteWithLatestAvroPayload {

Review comment:
       can we name this something like `OverwriteNonDefaultsWithLatestAvroPayload` 
   
   Trying to make sure it captures the fact that default value is used to decide whether we overwrite or not. Initially, I thought this was doing a partial merge (which is also something we should add IMO) - which just updates some columns. We eventually need soemthing like that to support a SQL MERGE 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -79,8 +79,18 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
    * @param genericRecord instance of {@link GenericRecord} of interest.
    * @returns {@code true} if record represents a delete record. {@code false} otherwise.
    */
-  private boolean isDeleteRecord(GenericRecord genericRecord) {
+  public boolean isDeleteRecord(GenericRecord genericRecord) {
     Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
     return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
+
+  /**
+   *
+   * @param value value in Insert Value
+   * @param defaultValue defaultValue of the field
+   * @return {@code true} if value equals defaultValue {@code false} otherwise.
+   */
+  public Boolean fieldJudge(Object value,Object defaultValue) {
+    return  defaultValue == null ? value == defaultValue : value.toString().equals(defaultValue.toString());

Review comment:
       can we just compare without the .toString() .. coz otherwise data type mismatches may be masked. i.e "1" vs 1L , which will both probably get the same string representation

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -79,8 +79,18 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
    * @param genericRecord instance of {@link GenericRecord} of interest.
    * @returns {@code true} if record represents a delete record. {@code false} otherwise.
    */
-  private boolean isDeleteRecord(GenericRecord genericRecord) {
+  public boolean isDeleteRecord(GenericRecord genericRecord) {
     Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
     return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
+
+  /**
+   *
+   * @param value value in Insert Value
+   * @param defaultValue defaultValue of the field
+   * @return {@code true} if value equals defaultValue {@code false} otherwise.
+   */
+  public Boolean fieldJudge(Object value,Object defaultValue) {

Review comment:
       would renaming to `ovewriteField()` be more understandable? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -79,8 +79,18 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
    * @param genericRecord instance of {@link GenericRecord} of interest.
    * @returns {@code true} if record represents a delete record. {@code false} otherwise.
    */
-  private boolean isDeleteRecord(GenericRecord genericRecord) {
+  public boolean isDeleteRecord(GenericRecord genericRecord) {
     Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
     return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
+
+  /**
+   *
+   * @param value value in Insert Value
+   * @param defaultValue defaultValue of the field
+   * @return {@code true} if value equals defaultValue {@code false} otherwise.
+   */
+  public Boolean fieldJudge(Object value,Object defaultValue) {

Review comment:
       surprised checkstyle is happy with space after comma after `Object value,` 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteMulColAvroPayload.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * subclass of OverwriteWithLatestAvroPayload used for delta streamer.
+ * <p>
+ * 1. preCombine - Picks the latest delta record for a key, based on an ordering field.
+ * 2. combineAndGetUpdateValue/getInsertValue - overwrite storage for specified fields
+ * that doesn't equal defaultValue.
+ */
+public class OverwriteMulColAvroPayload extends OverwriteWithLatestAvroPayload {
+
+  /**
+   * @param record      Generic record for the payload.
+   * @param orderingVal {@link Comparable} to be used in pre combine.
+   */
+  public OverwriteMulColAvroPayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public OverwriteMulColAvroPayload(Option<GenericRecord> record) {
+    this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
+
+    Option<IndexedRecord> recordOption = getInsertValue(schema);
+    if (!recordOption.isPresent()) {
+      return Option.empty();
+    }
+
+    GenericRecord insertRecord = (GenericRecord) recordOption.get();
+    GenericRecord currentRecord = (GenericRecord) currentValue;
+
+    Object deleteMarker = insertRecord.get("_hoodie_is_deleted");

Review comment:
       this check can be done using the `isDeleteRecord()` method. we can make that protected . wdyt?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteMulColAvroPayload.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * subclass of OverwriteWithLatestAvroPayload used for delta streamer.
+ * <p>
+ * 1. preCombine - Picks the latest delta record for a key, based on an ordering field.
+ * 2. combineAndGetUpdateValue/getInsertValue - overwrite storage for specified fields
+ * that doesn't equal defaultValue.
+ */
+public class OverwriteMulColAvroPayload extends OverwriteWithLatestAvroPayload {
+
+  /**
+   * @param record      Generic record for the payload.
+   * @param orderingVal {@link Comparable} to be used in pre combine.
+   */
+  public OverwriteMulColAvroPayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public OverwriteMulColAvroPayload(Option<GenericRecord> record) {
+    this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order

Review comment:
       can we call super(..) here

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -79,8 +79,18 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
    * @param genericRecord instance of {@link GenericRecord} of interest.
    * @returns {@code true} if record represents a delete record. {@code false} otherwise.
    */
-  private boolean isDeleteRecord(GenericRecord genericRecord) {
+  public boolean isDeleteRecord(GenericRecord genericRecord) {
     Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
     return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
+
+  /**
+   *
+   * @param value value in Insert Value
+   * @param defaultValue defaultValue of the field
+   * @return {@code true} if value equals defaultValue {@code false} otherwise.
+   */
+  public Boolean fieldJudge(Object value,Object defaultValue) {
+    return  defaultValue == null ? value == defaultValue : value.toString().equals(defaultValue.toString());

Review comment:
       would `Objects.equals()` help us compare more easily.. it can deal with one argument being null. we can then avoid this whole method.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org