You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/11 07:03:22 UTC

[GitHub] [hudi] shenh062326 opened a new pull request #1819: [HUDI-1058] Make delete marker configurable

shenh062326 opened a new pull request #1819:
URL: https://github.com/apache/hudi/pull/1819


   
   ## What is the purpose of the pull request
   
   users can specify any boolean field for delete marker and `_hoodie_is_deleted` remains as default.
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-669627457


   It seems there are two implementations.
   (a) If we add new methods like below to HoodieRecordPayload, since HoodieRecordPayload is an interface, it will incompatible will custom payload implementations.
   ```
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, PayloadConfig payloadConfig) 
   public Option<IndexedRecord>  getInsertValue(Schema schema, PayloadConfig payloadConfig)
   ```
   
   (b) If we add this method to OverwriteWithLatestAvroPayload,  when we call  HoodieRecordPayload.combineAndGetUpdateValue and HoodieRecordPayload.getInsertValue,  we need to check whether it's OverwriteWithLatestAvroPayload or not, if it's OverwriteWithLatestAvroPayload, we need to call getInsertValue(Schema schema, PayloadConfig payloadConfig); if it's not  OverwriteWithLatestAvroPayload,  we need to call getInsertValue(Schema schema). But there are many places will call HoodieRecordPayload.getInsertValue.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463935979



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);
+    record1.put(deleteField, true);
 
     GenericRecord record2 = new GenericData.Record(schema);
     record2.put("id", "2");
     record2.put("partition", "partition1");
     record2.put("ts", 1L);
-    record2.put("_hoodie_is_deleted", false);
+    record2.put(defaultDeleteField, false);
+    record2.put(deleteField, true);

Review comment:
       The same as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453455700



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       @shenh062326 Maybe we shouldn't do the check in the payload class itself. Maybe `org.apache.hudi.io.HoodieMergeHandle#write` is better for this job. After 
   https://github.com/apache/hudi/blob/2603cfb33e272632d7f36a53e1b13fe86dbb8627/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java#L222-L223
    we check it against the delete field defined in the configs and convert it to Option.empty() if appropriate. As this feature is config-related, whoever owns the configs should do it. Need more inputs from @nsivabalan 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455700501



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -184,6 +184,13 @@ object DataSourceWriteOptions {
   val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
   val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName
 
+  /**
+   * Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same
+   * key value, we will check if the new record is deleted by the delete field.
+   */
+  val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field"
+  val DEFAULT_DELETE_FIELD_OPT_VAL = OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD

Review comment:
       you might as well define the default var in this class rather than OverwriteWithLatestAvroPayload. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -202,6 +202,10 @@ public Operation convert(String value) throws ParameterException {
         + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
     public String sourceOrderingField = "ts";
 
+    @Parameter(names = {"--source-delete-field"}, description = "Field within source record to decide"
+            + " is this record is deleted. Default: " + OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD)

Review comment:
       this leakage is what I wish to avoid. Why would these classes access OverwriteWithLatestAvroPayload. moving it to DatasourceUtils or some config classes makes sense. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";
+  private String userDefinedDeleteField = DEFAULT_DELETE_FIELD;

Review comment:
       can we name it as "isDeletedField" or "isDeletedMarkerField". may or may not be user defined and hence. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r464012445



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteMarkerField, false);

Review comment:
       I will fix the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668284682


   Folks, I see a performance/efficiency problem with the approach here. We added a string field to every payload object, which will get increase the shuffle size in the write path. We would be just sending the same string with each and every object. 
   
   Can we rework this so that we introduce a new `PayloadConfig` object which we can hand to the payload methods at runtime, instead of adding this to constructor? We need to introduce new methods without breaking existing Custom payload implementations.
   
   cc @nsivabalan @shenh062326 @xushiyan 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-658130555


   Looks like there are 2 options here. 
   Option1: Change interface for combineAndGetUpdateValue and getInsertValue to take in delete field.
   check https://github.com/apache/hudi/pull/1792 on why we need to fix getInsertValue as well. 
   Option2: Approach taken in the patch, where in expose a setDeleteField method in OverwriteWithLatestAvroPayload and this will be set while generating HoodieRecords from GenericRecord.
   
   Since making changes in getInsertValue will touch lot of files, guess we can go with Option2 with some changes. 
   Instead of exposing a new method, why can't we add an overloaded constructor. When HoodieSparkSqlWriter calls into 
   ```
   DataSourceUtils.createHoodieRecord(gr,
                   orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
   ```
   we could also pass in a delete field optionally which inturn will be passed into constructor of OverwriteWithLatestAvroPayload. If delete field is set by user, then we take that else we resort to "_hoodie_is_deleted".
   
   Let me know your thoughts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-658217251


   @nsivabalan actually what i commented here is the 3rd option
   
   > @shenh062326 Maybe we shouldn't do the check in the payload class itself. Maybe `org.apache.hudi.io.HoodieMergeHandle#write` is better for this job. After
   > https://github.com/apache/hudi/blob/2603cfb33e272632d7f36a53e1b13fe86dbb8627/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java#L222-L223
   > 
   > 
   > we check `combinedAvroRecord` against the delete field defined in the configs and convert it to Option.empty() if appropriate. As this feature is config-related, whoever owns the configs should do it. Need more inputs from @nsivabalan
   
   Basically it is about shifting the responsibility of converting records to `Option.empty()` to `HoodieMergeHandle`. WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r457912958



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -337,9 +337,15 @@ private void refreshTimeline() throws IOException {
     }
 
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+    if (writeClient == null) {
+      this.schemaProvider = schemaProvider;
+      setupWriteClient();

Review comment:
       +1, lets not create the write client here. As we are setting the field (including defaults) from DataSourceReadOptions, you can use the TypedProperties to get it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r457472660



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -337,9 +337,15 @@ private void refreshTimeline() throws IOException {
     }
 
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+    if (writeClient == null) {
+      this.schemaProvider = schemaProvider;
+      setupWriteClient();

Review comment:
       all we need is a config here. don't think we need to initialize writeClient here.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -66,8 +74,9 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
     }
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
-    // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    // combining strategy here trivially ignores currentValue on disk and writes this record吗
+    String deleteField = isDeletedField == null ? "_hoodie_is_deleted" : isDeletedField;

Review comment:
       sorry, I didn't realize the other constructor. We could then initialize isDeletedField = "_hoodie_is_deleted"; So that one of the constructors will over-ride the value. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r457479621



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -337,9 +337,15 @@ private void refreshTimeline() throws IOException {
     }
 
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+    if (writeClient == null) {
+      this.schemaProvider = schemaProvider;
+      setupWriteClient();

Review comment:
       also thinking do we really need to instantiate the config. since it is just one property, can't we directly read if from TypedProperties? @bvaradar : do you have any thoughts on this. Basically we need to read just one config value for deleteMarker from the properties set. This step is little ahead of where we instantiate writeClient, so wondering how to go about it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463940587



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {
+    // test defaultDeleteField
+    this.testOverwriteLatestAvroPayload(null);
+
+    // test userDefinedDeleteField
+    this.testOverwriteLatestAvroPayload("user_defined_delete_field");
+  }
+
+  public void testOverwriteLatestAvroPayload(String deleteMarkerField) throws Exception {
+    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+    List<GenericRecord> records = HoodieTestDataGenerator.genericRecords(5, false, 0);
+    Helpers.saveParquetToDFS(records, new Path(path));
+
+    TypedProperties parquetProps = new TypedProperties();
+    parquetProps.setProperty("include", "base.properties");
+    parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
+    if (deleteMarkerField != null) {
+      parquetProps.setProperty(HoodieWriteConfig.DELETE_FIELD_PROP, deleteMarkerField);
+    }
+    Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
+
+    String tableBasePath = dfsBasePath + "/test_overwrite_lastest_avro_payload_table";
+
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(),
+            null, PROPS_FILENAME_TEST_PARQUET, false,
+            false, 100000, false, null, null, "timestamp"), jsc);
+    deltaStreamer.sync();
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + "/*/*.parquet", sqlContext);
+
+    String path2 = PARQUET_SOURCE_ROOT + "/2.parquet";
+    List<GenericRecord> records2 = HoodieTestDataGenerator.genericRecords(2, true, 1);
+    Helpers.saveParquetToDFS(records2, new Path(path2));
+    deltaStreamer.sync();
+
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(3, tableBasePath + "/*/*.parquet", sqlContext);

Review comment:
       Since the returned Spark Row is not easy to compare with GenericRecord, it is easier for us to compare only the key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463936014



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -251,6 +253,10 @@ public int getMaxConsistencyCheckIntervalMs() {
     return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
+  public String getDeleteField() {

Review comment:
       Thanks for your comments, I will fix it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r460415342



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,8 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  private String deletedField = "_hoodie_is_deleted";

Review comment:
       can we name something like "deleteMarkerField" or something. Feel "deletedField" conveys the field is deleted.

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -100,6 +100,53 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     }
   }
 
+  test("test OverwriteWithLatestAvroPayload with user defined delete field") {
+    val session = SparkSession.builder()
+      .appName("test_append_mode")
+      .master("local[2]")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
+
+    try {
+      val sqlContext = session.sqlContext
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      val keyField = "id"
+      val deleteField = "delete_field"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        "hoodie.insert.shuffle.parallelism" -> "2",
+        "hoodie.upsert.shuffle.parallelism" -> "2",
+        DELETE_FIELD_OPT_KEY -> deleteField,
+        RECORDKEY_FIELD_OPT_KEY -> keyField)
+      val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+
+      val dataFrame = session.createDataFrame(Seq(
+        (12, "ming", 20.23, "2018-01-01T13:51:39.340396Z", false),
+        (34, "zhi", 21.323, "2018-01-01T13:52:39.340396Z", false)
+      )) toDF(keyField, "name", "weight", "ts", deleteField)
+
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
+      val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count
+      assert(recordCount1 == 2, "result should be 2, but get " + recordCount1)
+
+      val dataFrame2 = session.createDataFrame(Seq(
+        (12, "ming", 20.23, "2018-01-01T13:53:39.340396Z", true),
+        (34, "zhi", 30.3, "2018-01-01T13:54:39.340396Z", false)
+      )) toDF(keyField, "name", "weight", "ts", deleteField)
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2)
+
+      val recordCount = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count
+      assert(recordCount == 1, "result should be 1, but get " + recordCount)

Review comment:
       can we also compare the record actually matches the active one and not the deleted one. 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
+  }
+
+  private static List<GenericRecord> genericRecords(int n, boolean isDeleteRecord, int instantTime) {

Review comment:
       why not move this method to HoodieTestDataGenerator? 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,16 +55,16 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(deleteField, false);

Review comment:
       can we test for both cases, i.e. user defined field and non user defined field for detele marker field

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -100,6 +100,53 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     }
   }
 
+  test("test OverwriteWithLatestAvroPayload with user defined delete field") {
+    val session = SparkSession.builder()
+      .appName("test_append_mode")
+      .master("local[2]")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
+
+    try {
+      val sqlContext = session.sqlContext
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      val keyField = "id"
+      val deleteField = "delete_field"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        "hoodie.insert.shuffle.parallelism" -> "2",
+        "hoodie.upsert.shuffle.parallelism" -> "2",
+        DELETE_FIELD_OPT_KEY -> deleteField,
+        RECORDKEY_FIELD_OPT_KEY -> keyField)
+      val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+
+      val dataFrame = session.createDataFrame(Seq(
+        (12, "ming", 20.23, "2018-01-01T13:51:39.340396Z", false),

Review comment:
       try to avoid hardcoding test records. If you generate in a for loop, it wud be easy to scale tests. 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
+  }
+
+  private static List<GenericRecord> genericRecords(int n, boolean isDeleteRecord, int instantTime) {
+    return IntStream.range(0, n).boxed().map(i -> {
+      String partitionPath = "partitionPath1";
+      HoodieKey key = new HoodieKey("id_" + i, partitionPath);
+      HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
+      kp.key = key;
+      kp.partitionPath = partitionPath;
+      return HoodieTestDataGenerator.generateGenericRecord(
+          key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, instantTime, isDeleteRecord, false);
+    }).collect(Collectors.toList());
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {

Review comment:
       can we test for both default and user defined fields here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r461266911



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,8 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  private String deletedField = "_hoodie_is_deleted";

Review comment:
       Thanks for your comments, I will fix the comments.

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -100,6 +100,53 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     }
   }
 
+  test("test OverwriteWithLatestAvroPayload with user defined delete field") {
+    val session = SparkSession.builder()
+      .appName("test_append_mode")
+      .master("local[2]")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
+
+    try {
+      val sqlContext = session.sqlContext
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      val keyField = "id"
+      val deleteField = "delete_field"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        "hoodie.insert.shuffle.parallelism" -> "2",
+        "hoodie.upsert.shuffle.parallelism" -> "2",
+        DELETE_FIELD_OPT_KEY -> deleteField,
+        RECORDKEY_FIELD_OPT_KEY -> keyField)
+      val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+
+      val dataFrame = session.createDataFrame(Seq(
+        (12, "ming", 20.23, "2018-01-01T13:51:39.340396Z", false),

Review comment:
       Here the records is no need to scale, I will change the Seq to only contains 1 element.

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
+  }
+
+  private static List<GenericRecord> genericRecords(int n, boolean isDeleteRecord, int instantTime) {
+    return IntStream.range(0, n).boxed().map(i -> {
+      String partitionPath = "partitionPath1";
+      HoodieKey key = new HoodieKey("id_" + i, partitionPath);
+      HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
+      kp.key = key;
+      kp.partitionPath = partitionPath;
+      return HoodieTestDataGenerator.generateGenericRecord(
+          key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, instantTime, isDeleteRecord, false);
+    }).collect(Collectors.toList());
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {

Review comment:
       Sure, I will add both.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-658495234


   @xushiyan  It seems good for OverwriteWithLatestAvroPayload. But for AWSDmsAvroPayload, users need to define not only the deletion column, but also the processing method of deleting the column. Where is the new method should be? If it continue to stay in combineAndGetUpdateValue, then combineAndGetUpdateValue has the processing delete logic, and the calling method also has the delete logic, will it be repeated?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668930304


   Let me confirm the new implementation again, adding a field containing transient to OverwriteWithLatestAvroPayload, like below:
   ```
     private transient String deleteMarkerField = null;
   
     public void setDeleteMarkerField(String deleteMarkerField) {
       this.deleteMarkerField = deleteMarkerField;
     }
   ```
   
   And set the deleteMarkerField before HoodieMergeHandle.write call OverwriteWithLatestAvroPayload.combineAndGetUpdateValue, right?
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453455700



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       @shenh062326 Maybe we shouldn't do the check in the payload class itself. Maybe `org.apache.hudi.io.HoodieMergeHandle#write` is better for this job. After getting `Option<IndexedRecord> combinedAvroRecord`, we check it against the delete field defined in the configs and convert it to Option.empty() if appropriate. As this feature is config-related, whoever owns the configs should do it. Need more inputs from @nsivabalan 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r464010724



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -267,15 +268,15 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre
     return success;
   }
 
-  public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
+  public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws HoodieCommitException {
     try {
       HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
       Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
       LOG.info("Wrapper schema " + wrapperSchema.toString());
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
-          deleteAnyLeftOverMarkerFiles(hoodieInstant);
+          deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);

Review comment:
       sorry, It does not belong to this pull request, I will remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668073017


   @nsivabalan no, please feel free to merge. there has been thorough reviews :) thanks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 edited a comment on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 edited a comment on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668542359


   > makes sense. sorry about the oversight.
   
   I can rework it. 
   Let me confirm how to do it first, adding PayloadConfig, and then calling PayloadConfig in OverwriteWithLatestAvroPayload.isDeleteRecord to get deleteMarkerField, is that right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463548033



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -177,6 +178,18 @@ public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, Hoodi
     return null;
   }
 
+  public static List<GenericRecord> genericRecords(int n, boolean isDeleteRecord, int instantTime) {

Review comment:
       generateGenericRecords.

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -37,14 +37,17 @@
 public class TestOverwriteWithLatestAvroPayload {
 
   private Schema schema;
+  String defaultDeleteField = "_hoodie_is_deleted";
+  String deleteField = "delete_field";

Review comment:
       deleteMarkerField

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);
+    record1.put(deleteField, true);

Review comment:
       shouldn't this also be false.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -215,11 +216,20 @@ public static HoodieDateTimeParser createDateTimeParser(TypedProperties props, S
   /**
    * Create a payload class via reflection, passing in an ordering/precombine value.
    */
-  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
-      throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record,
+                                                  Comparable orderingVal,
+                                                  String deleteField) throws IOException {

Review comment:
       deleteMarkerField

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {
+    // test defaultDeleteField
+    this.testOverwriteLatestAvroPayload(null);
+
+    // test userDefinedDeleteField
+    this.testOverwriteLatestAvroPayload("user_defined_delete_field");
+  }
+
+  public void testOverwriteLatestAvroPayload(String deleteMarkerField) throws Exception {

Review comment:
       private

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);
+    record1.put(deleteField, true);
 
     GenericRecord record2 = new GenericData.Record(schema);
     record2.put("id", "2");
     record2.put("partition", "partition1");
     record2.put("ts", 1L);
-    record2.put("_hoodie_is_deleted", false);
+    record2.put(defaultDeleteField, false);
+    record2.put(deleteField, true);

Review comment:
       same here.

##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -251,6 +253,10 @@ public int getMaxConsistencyCheckIntervalMs() {
     return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
+  public String getDeleteField() {

Review comment:
       lets be uniform throughout. getDeleteMarkerField. 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -275,8 +285,9 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
-                                                String payloadClass) throws IOException {
-    HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
+                                                String payloadClass,
+                                                String deleteField) throws IOException {

Review comment:
       deleteMarkerField

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -80,13 +85,16 @@ public void testDeletedRecord() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);

Review comment:
       can we also fix lines 70 and 71. While creating the OverwriteWithLatestAvroPayload, pass in the delete field marker. Again, can we test for both default and user defined. So, since these are active field, existing assertions should work. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -337,9 +337,11 @@ private void refreshTimeline() throws IOException {
     }
 
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+    String deleteField = props.getString(HoodieWriteConfig.DELETE_FIELD_PROP, HoodieWriteConfig.DEFAULT_DELETE_FIELD);

Review comment:
       deleteMarkerField

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {
+    // test defaultDeleteField
+    this.testOverwriteLatestAvroPayload(null);
+
+    // test userDefinedDeleteField
+    this.testOverwriteLatestAvroPayload("user_defined_delete_field");
+  }
+
+  public void testOverwriteLatestAvroPayload(String deleteMarkerField) throws Exception {
+    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+    List<GenericRecord> records = HoodieTestDataGenerator.genericRecords(5, false, 0);
+    Helpers.saveParquetToDFS(records, new Path(path));
+
+    TypedProperties parquetProps = new TypedProperties();
+    parquetProps.setProperty("include", "base.properties");
+    parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
+    if (deleteMarkerField != null) {
+      parquetProps.setProperty(HoodieWriteConfig.DELETE_FIELD_PROP, deleteMarkerField);
+    }
+    Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
+
+    String tableBasePath = dfsBasePath + "/test_overwrite_lastest_avro_payload_table";
+
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(),
+            null, PROPS_FILENAME_TEST_PARQUET, false,
+            false, 100000, false, null, null, "timestamp"), jsc);
+    deltaStreamer.sync();
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + "/*/*.parquet", sqlContext);
+
+    String path2 = PARQUET_SOURCE_ROOT + "/2.parquet";
+    List<GenericRecord> records2 = HoodieTestDataGenerator.genericRecords(2, true, 1);
+    Helpers.saveParquetToDFS(records2, new Path(path2));
+    deltaStreamer.sync();
+
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(3, tableBasePath + "/*/*.parquet", sqlContext);

Review comment:
       is it possible to verify the records for equality ? 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -98,6 +106,18 @@ public void testDeletedRecord() throws IOException {
 
     assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record1);
     assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent());
+
+    // test userDefinedDeleteField

Review comment:
       we could move this to a private method and so lines 99 tp 108 and 110 to 120 can re-use code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-663915325


   > I don't see any tests being added as part of the patch. Would be nice to have some tests covering the new code that was added at all levels.
   > 
   > * WriteClient
   > * Datasource if there is an existing suite of tests for other write operations
   > * Deltastreamer
   
   Done.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-658502484


   @xushiyan : nope. As I have mentioned above, we need it in getInsertValue() as well which is called from lot of classes. Hence I suggested to add it as part of constructor. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455816566



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";
+  private String userDefinedDeleteField = DEFAULT_DELETE_FIELD;

Review comment:
       no, I meant it as field name referring to column. Thats why suffixed with "field"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668833066


   I see it now. got it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668830077


   @vinothchandar : if I am not wrong, we added an additional overloaded constructor to OverwriteWithLatestAvroPayload. so shouldn't have broken any existing implementations. Only if someone wants to leverage user defined col, they might have to use the new constructor. Anyways, your perf reasoning is convincing, hence going ahead w/ reverting. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-657088170


   @nsivabalan can you please take a pass once CI is passing


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455609009



##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -176,11 +177,21 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx
   /**
    * Create a payload class via reflection, passing in an ordering/precombine value.
    */
-  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
-      throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record,
+                                                  Comparable orderingVal,
+                                                  String deleteField) throws IOException {
     try {
-      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-          new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
+      HoodieRecordPayload payload = null;
+      if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) &&
+              !deleteField.isEmpty()) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-669156164


   nope. we need to create a new class called PayloadConfig which will hold the deleteMarkerField col name. Similar to how we pass in Schema to OverwriteWithLatestAvroPayload#combineAndGetUpdateValue and OverwriteWithLatestAvroPayload#getInsertValue, we have to pass this payloadConfig as well. Hope you get the gist. 
   
   @vinothchandar : can you confirm if this is the right approach. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r456459962



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";

Review comment:
       sorry, don't understand why we need this. the config value will either refer to "_hoodie_is_deleted" if not set, or will refer to the user defined prop. So, we could initialize isDeletedField = null and set it in constructor. correct me if my understanding is wrong. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453449647



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       > do we need to add the new method to the interface HoodieRecordPayload
   
   hmmm. that's tricky. Sorry i did not realize this is an interface API.. does not look clean to make it into the interface... then keeping it local to the class lose agnosticness when calling `combineAndGetUpdateValue()`.. need more thoughts on this..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453222674



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       To maintain full compatibility and caller convenience, just provide an overloading version like
   ```java
   combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
     combineAndGetUpdateValue(currentValue, schema, Option.emtpy())
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-658505404


   @shenh062326 @nsivabalan got it. yup, making it through the constructor looks good. thanks for clarifying.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453221663



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       The only caller to this method is `org.apache.hudi.io.HoodieMergeHandle#write`, where you have access to `HoodieWriteConfig config`. 
   
   If you make the configuration `delete.field` accessible from `HoodieWriteConfig` then make this method 
   
   ```java
   combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Option<String> deleteField) {
   }
   ```
   
   then the overall logic will be simplified a lot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-679075213


   > @shenh062326 : sorry for late reply. Can you check #1704 patch on how to add new apis. Infact, we need to coordinate both these patches. I mean, land one and rebase other one and re-use the new apis added as part of first patch. @bhasudha : wrt your patch, What is your take on adding a PayloadConfig instead of a Map<>. I understand map is very flexible, but for payload class, hudi controls what goes in.
   
   Sure, I will check it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453293178



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       It sounds good. But do we need to add the new method to the interface HoodieRecordPayload? If so, does the classes that  has implemented need to implement this method?
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455440158



##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -176,11 +177,21 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx
   /**
    * Create a payload class via reflection, passing in an ordering/precombine value.
    */
-  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
-      throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record,
+                                                  Comparable orderingVal,
+                                                  String deleteField) throws IOException {
     try {
-      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-          new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
+      HoodieRecordPayload payload = null;
+      if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) &&
+              !deleteField.isEmpty()) {

Review comment:
       we can simplify this(I haven't gone through the entire patch yet). If you set default value for this config param to "_hoodie_is_deleted". then you don't need to check for isEmpty. also within OverwriteWithLatestAvroPayload, you could just take in the deleteKeyField as is. either it will point to "_hoodie_is_deleted" is not set. Else it will point to the one user has over-written. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453455700



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       @shenh062326 Maybe we shouldn't do the check in the payload class itself. Maybe `org.apache.hudi.io.HoodieMergeHandle#write` is better for this job. After 
   https://github.com/apache/hudi/blob/2603cfb33e272632d7f36a53e1b13fe86dbb8627/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java#L222-L223
    we check `combinedAvroRecord` against the delete field defined in the configs and convert it to Option.empty() if appropriate. As this feature is config-related, whoever owns the configs should do it. Need more inputs from @nsivabalan 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-673995094


   @vinothchandar  @nsivabalan  Can you take a look at this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455727810



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";
+  private String userDefinedDeleteField = DEFAULT_DELETE_FIELD;

Review comment:
       If the variable name  change to "isDeletedField" or "isDeletedMarkerField", it means the variable type should be boolean. Maybe the default value of userDefinedDeleteField change to null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668829234


   @shenh062326 : appreciate your help. We are looking to have a release by this weekend and so I am reverting this patch for now. I will work with you with the right fix for configurable delete marker. if we can get it in by this weekend well and good, even if not, we can land it later once we cut the release for 0.6.0. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan merged pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan merged pull request #1819:
URL: https://github.com/apache/hudi/pull/1819


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-678631130


   @shenh062326 : sorry for late reply. Can you check https://github.com/apache/hudi/pull/1704 patch on how to add new apis. Infact, we need to coordinate both these patches. I mean, land one and rebase other one and re-use the new apis added as part of first patch. @bhasudha : wrt your patch, What is your take on adding a PayloadConfig instead of a Map<>. I understand map is very flexible, but for payload class, hudi controls what goes in. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453455700



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       @shenh062326 Maybe we shouldn't do the check in the payload class itself. Maybe `org.apache.hudi.io.HoodieMergeHandle#write` is better for this job. After 
   ```java
   Option<IndexedRecord> combinedAvroRecord =
               hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema);
   ```
    we check it against the delete field defined in the configs and convert it to Option.empty() if appropriate. As this feature is config-related, whoever owns the configs should do it. Need more inputs from @nsivabalan 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668831250


   @nsivabalan my concern is more on the overhead of the extra field on the payload serialization. No matter what constructor is called. there is a string field that will be serialized, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668066597


   @xushiyan : do you plan to review? Or can I go ahead and merge this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453220218



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";
+  private String userDefineDeleteField = null;

Review comment:
       typo: it should be `userDefinedXXX`

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -202,6 +202,10 @@ public Operation convert(String value) throws ParameterException {
         + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
     public String sourceOrderingField = "ts";
 
+    @Parameter(names = {"--source-delete-field"}, description = "Field within source record to decide"
+            + " is this record is delete record. Default: " + OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD)
+    public String sourceDeleteField = OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD;
+

Review comment:
       not necessary if config goes to `HoodieWriteConfig`

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       The only caller to this method is `org.apache.hudi.io.HoodieMergeHandle#write`, where you have access to `HoodieWriteConfig config`. 
   
   If you make the configuration `delete.field` accessible from `HoodieWriteConfig` then make this method `combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Option<String> deleteField)`, the overall logic will be simiplified a lot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463935932



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);
+    record1.put(deleteField, true);

Review comment:
       In fact, it does not matter whether deleteField is true or false here, only defaultDeleteField will affect decide the result.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r456771979



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";

Review comment:
       Remove DEFAULT_DELETE_FIELD.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r458181162



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -66,8 +74,9 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
     }
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
-    // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    // combining strategy here trivially ignores currentValue on disk and writes this record吗
+    String deleteField = isDeletedField == null ? "_hoodie_is_deleted" : isDeletedField;

Review comment:
       > I don't see any tests being added as part of the patch. Would be nice to have some tests covering the new code that was added at all levels.
   > 
   > * WriteClient
   > * Datasource if there is an existing suite of tests for other write operations
   > * Deltastreamer
   
   Sorry for late, I will add some testcases and fix the comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453222674



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       To maintain full compatibility, just provide an overloading version like
   ```java
   combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) {
     combineAndGetUpdateValue(currentValue, schema, Option.emtpy())
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463979191



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -267,15 +268,15 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre
     return success;
   }
 
-  public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
+  public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws HoodieCommitException {
     try {
       HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
       Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
       LOG.info("Wrapper schema " + wrapperSchema.toString());
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
-          deleteAnyLeftOverMarkerFiles(hoodieInstant);
+          deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);

Review comment:
       sorry, why making these changes in this PR ? This PR is meant for delete marker field. are these changes related to user defined delete marker field ? 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteMarkerField, false);

Review comment:
       actually we could test this way. not sure if you already do that. 
   set default marker field value to true and user defined to false. If OverwriteWithLatestAvro is instantiated w/o any marker fields, the record should be deleted. If OverwriteWithLatestAvro is instantiated w/ user defined marker field, the record should be considered active. Vice versa as well. All tests in this class could be done this way to ensure that the other column is treated as yet another user's data column and hoodie does not care about it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r458181162



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -66,8 +74,9 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
     }
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
-    // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    // combining strategy here trivially ignores currentValue on disk and writes this record吗
+    String deleteField = isDeletedField == null ? "_hoodie_is_deleted" : isDeletedField;

Review comment:
       > I don't see any tests being added as part of the patch. Would be nice to have some tests covering the new code that was added at all levels.
   > 
   > * WriteClient
   > * Datasource if there is an existing suite of tests for other write operations
   > * Deltastreamer
   
   Sorry for late, I will add some testcase and fix the comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453221663



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
 
     GenericRecord genericRecord = (GenericRecord) recordOption.get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD : userDefineDeleteField;
+    Object deleteMarker = genericRecord.get(deleteField);

Review comment:
       The only caller to this method is `org.apache.hudi.io.HoodieMergeHandle#write`, where you have access to `HoodieWriteConfig config`. 
   
   If you make the configuration `delete.field` accessible from `HoodieWriteConfig` then make this method 
   
   ```java
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Option<String> deleteField) {
   }
   ```
   
   then the overall logic will be simplified a lot.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668534087


   makes sense. sorry about the oversight. 
   I will take up the rework by using a payloadConfig class.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455440158



##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -176,11 +177,21 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx
   /**
    * Create a payload class via reflection, passing in an ordering/precombine value.
    */
-  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
-      throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record,
+                                                  Comparable orderingVal,
+                                                  String deleteField) throws IOException {
     try {
-      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-          new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
+      HoodieRecordPayload payload = null;
+      if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) &&
+              !deleteField.isEmpty()) {

Review comment:
       we can simplify this(I haven't gone through the entire patch yet). If you set default value for this config param to "_hoodie_is_deleted", then you don't need to check for isEmpty. also within OverwriteWithLatestAvroPayload, you could just take in the deleteKeyField as is. either it will point to "_hoodie_is_deleted" if not set. Else it will point to the one user has over-written. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] shenh062326 commented on pull request #1819: [HUDI-1058] Make delete marker configurable

Posted by GitBox <gi...@apache.org>.
shenh062326 commented on pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#issuecomment-668542359


   > makes sense. sorry about the oversight.
   
   I can rework it. 
   Let me confirm the how to do it first, adding PayloadConfig, and then calling PayloadConfig in OverwriteWithLatestAvroPayload to get deleteMarkerField, is that right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org