You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/04 22:20:54 UTC

[hudi] branch master updated: [REVERT] "[HUDI-1058] Make delete marker configurable (#1819)" (#1914)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ab11ba4  [REVERT] "[HUDI-1058] Make delete marker configurable (#1819)" (#1914)
ab11ba4 is described below

commit ab11ba43e1a5496cf85a7a772929bb90fcbf07d3
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Tue Aug 4 18:20:38 2020 -0400

    [REVERT] "[HUDI-1058] Make delete marker configurable (#1819)" (#1914)
    
    This reverts commit 433d7d2c9886fed161557efe88b62ebdce0fe5df.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  9 --
 .../model/OverwriteWithLatestAvroPayload.java      | 10 +--
 .../model/TestOverwriteWithLatestAvroPayload.java  | 67 +++++----------
 .../common/testutils/HoodieTestDataGenerator.java  | 27 ++----
 .../main/java/org/apache/hudi/DataSourceUtils.java | 23 ++---
 .../SparkParquetBootstrapDataProvider.java         |  2 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  7 --
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  5 +-
 .../functional/HoodieSparkSqlWriterSuite.scala     | 46 ----------
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  6 +-
 ...eltaStreamerWithOverwriteLatestAvroPayload.java | 97 ----------------------
 .../resources/delta-streamer-config/source.avsc    |  4 -
 .../sql-transformer.properties                     |  2 +-
 .../resources/delta-streamer-config/target.avsc    |  4 -
 14 files changed, 43 insertions(+), 266 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index affe553..80bc17e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -94,9 +94,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
   public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
       .toString();
-  public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field";
-  public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted";
-
 
   public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
   public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
@@ -277,10 +274,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
   }
 
-  public String getDeleteMarkerField() {
-    return props.getProperty(DELETE_MARKER_FIELD_PROP);
-  }
-
   /**
    * compaction properties.
    */
@@ -964,8 +957,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
       setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
           BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
-      setDefaultOnCondition(props, !props.containsKey(DELETE_MARKER_FIELD_PROP),
-          DELETE_MARKER_FIELD_PROP, DEFAULT_DELETE_MARKER_FIELD);
 
       // Make sure the props is propagated
       setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index 0e4b18a..d8dffdf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -36,8 +36,6 @@ import java.io.IOException;
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
-  private String deleteMarkerField = "_hoodie_is_deleted";
-
   /**
    *
    */
@@ -49,12 +47,6 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
   }
 
-  public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal,
-                                        String deleteMarkerField) {
-    this(record, orderingVal);
-    this.deleteMarkerField = deleteMarkerField;
-  }
-
   @Override
   public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) {
     // pick the payload with greatest ordering value
@@ -88,7 +80,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
    * @returns {@code true} if record represents a delete record. {@code false} otherwise.
    */
   private boolean isDeleteRecord(GenericRecord genericRecord) {
-    Object deleteMarker = genericRecord.get(deleteMarkerField);
+    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
     return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
index e212367..7c5951a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
@@ -37,8 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 public class TestOverwriteWithLatestAvroPayload {
 
   private Schema schema;
-  String defaultDeleteMarkerField = "_hoodie_is_deleted";
-  String deleteMarkerField = "delete_marker_field";
 
   @BeforeEach
   public void setUp() throws Exception {
@@ -46,56 +44,26 @@ public class TestOverwriteWithLatestAvroPayload {
         new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
         new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null),
         new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
-        new Schema.Field(defaultDeleteMarkerField, Schema.create(Type.BOOLEAN), "", false),
-        new Schema.Field(deleteMarkerField, Schema.create(Type.BOOLEAN), "", false)
+        new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN), "", false)
     ));
   }
 
   @Test
-  public void testOverwriteWithLatestAvroPayload() throws IOException {
+  public void testActiveRecords() throws IOException {
     GenericRecord record1 = new GenericData.Record(schema);
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put(defaultDeleteMarkerField, false);
-    record1.put(deleteMarkerField, false);
+    record1.put("_hoodie_is_deleted", false);
 
-    // test1: set default marker field value to true and user defined to false
     GenericRecord record2 = new GenericData.Record(schema);
     record2.put("id", "2");
     record2.put("partition", "partition1");
     record2.put("ts", 1L);
-    record2.put(defaultDeleteMarkerField, true);
-    record2.put(deleteMarkerField, false);
-
-    // set to user defined marker field with false, the record should be considered active.
-    assertActiveRecord(record1, record2, deleteMarkerField);
-
-    // set to default marker field with true, the record should be considered delete.
-    assertDeletedRecord(record1, record2, defaultDeleteMarkerField);
-
-    // test2: set default marker field value to false and user defined to true
-    GenericRecord record3 = new GenericData.Record(schema);
-    record3.put("id", "2");
-    record3.put("partition", "partition1");
-    record3.put("ts", 1L);
-    record3.put(defaultDeleteMarkerField, false);
-    record3.put(deleteMarkerField, true);
-
-    // set to user defined marker field with true, the record should be considered delete.
-    assertDeletedRecord(record1, record3, deleteMarkerField);
-
-    // set to default marker field with false, the record should be considered active.
-    assertActiveRecord(record1, record3, defaultDeleteMarkerField);
-  }
-
-  private void assertActiveRecord(GenericRecord record1,
-                                  GenericRecord record2, String field) throws IOException {
-    OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(
-        record1, 1, field);
-    OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(
-        record2, 2, field);
+    record2.put("_hoodie_is_deleted", false);
 
+    OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1);
+    OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(record2, 2);
     assertEquals(payload1.preCombine(payload2), payload2);
     assertEquals(payload2.preCombine(payload1), payload2);
 
@@ -106,12 +74,22 @@ public class TestOverwriteWithLatestAvroPayload {
     assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record2);
   }
 
-  private void assertDeletedRecord(GenericRecord record1,
-                                   GenericRecord delRecord1, String field) throws IOException {
-    OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(
-        record1, 1, field);
-    OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(
-        delRecord1, 2, field);
+  @Test
+  public void testDeletedRecord() throws IOException {
+    GenericRecord record1 = new GenericData.Record(schema);
+    record1.put("id", "1");
+    record1.put("partition", "partition0");
+    record1.put("ts", 0L);
+    record1.put("_hoodie_is_deleted", false);
+
+    GenericRecord delRecord1 = new GenericData.Record(schema);
+    delRecord1.put("id", "2");
+    delRecord1.put("partition", "partition1");
+    delRecord1.put("ts", 1L);
+    delRecord1.put("_hoodie_is_deleted", true);
+
+    OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1);
+    OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(delRecord1, 2);
     assertEquals(payload1.preCombine(payload2), payload2);
     assertEquals(payload2.preCombine(payload1), payload2);
 
@@ -121,4 +99,5 @@ public class TestOverwriteWithLatestAvroPayload {
     assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record1);
     assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent());
   }
+
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 6c56ff1..1ead5ff 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -106,7 +106,6 @@ public class HoodieTestDataGenerator {
       + "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},"
       + "{\"name\": \"weight\", \"type\": \"float\"},"
       + "{\"name\": \"nation\", \"type\": \"bytes\"},"
-      + "{\"name\": \"user_defined_delete_marker_field\", \"type\": \"boolean\", \"default\": false},"
       + "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},"
       + "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},"
       + "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
@@ -124,7 +123,7 @@ public class HoodieTestDataGenerator {
       + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
 
   public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
-  public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,boolean,int,bigint,decimal(10,6),"
+  public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
       + "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
 
 
@@ -180,18 +179,6 @@ public class HoodieTestDataGenerator {
     return null;
   }
 
-  public static List<GenericRecord> generateGenericRecords(int n, boolean isDeleteRecord, int instantTime) {
-    return IntStream.range(0, n).boxed().map(i -> {
-      String partitionPath = DEFAULT_FIRST_PARTITION_PATH;
-      HoodieKey key = new HoodieKey("id_" + i, partitionPath);
-      HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
-      kp.key = key;
-      kp.partitionPath = partitionPath;
-      return HoodieTestDataGenerator.generateGenericRecord(
-              key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, instantTime, isDeleteRecord, false);
-    }).collect(Collectors.toList());
-  }
-
   /**
    * Generates a new avro record of the above nested schema format,
    * retaining the key if optionally provided.
@@ -278,11 +265,11 @@ public class HoodieTestDataGenerator {
       rec.put("weight", RAND.nextFloat());
       byte[] bytes = "Canada".getBytes();
       rec.put("nation", ByteBuffer.wrap(bytes));
-      rec.put("user_defined_delete_marker_field", isDeleteRecord);
       long currentTimeMillis = System.currentTimeMillis();
       Date date = new Date(currentTimeMillis);
       rec.put("current_date", (int) date.toLocalDate().toEpochDay());
       rec.put("current_ts", currentTimeMillis);
+
       BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat()));
       Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
       Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
@@ -305,7 +292,11 @@ public class HoodieTestDataGenerator {
       rec.put("tip_history", tipHistoryArray);
     }
 
-    rec.put("_hoodie_is_deleted", isDeleteRecord);
+    if (isDeleteRecord) {
+      rec.put("_hoodie_is_deleted", true);
+    } else {
+      rec.put("_hoodie_is_deleted", false);
+    }
     return rec;
   }
 
@@ -769,8 +760,8 @@ public class HoodieTestDataGenerator {
 
   public static class KeyPartition implements Serializable {
 
-    public HoodieKey key;
-    public String partitionPath;
+    HoodieKey key;
+    String partitionPath;
   }
 
   public void close() {
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 16d6f8d..3345204 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -208,20 +207,11 @@ public class DataSourceUtils {
   /**
    * Create a payload class via reflection, passing in an ordering/precombine value.
    */
-  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record,
-                                                  Comparable orderingVal,
-                                                  String deleteMarkerField) throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
+      throws IOException {
     try {
-      HoodieRecordPayload payload = null;
-      if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())) {
-        payload = (OverwriteWithLatestAvroPayload) ReflectionUtils.loadClass(payloadClass,
-                new Class<?>[]{GenericRecord.class, Comparable.class, String.class},
-                record, orderingVal, deleteMarkerField);
-      } else {
-        payload = (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-                new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
-      }
-      return payload;
+      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
+          new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
     } catch (Throwable e) {
       throw new IOException("Could not create payload for class: " + payloadClass, e);
     }
@@ -277,9 +267,8 @@ public class DataSourceUtils {
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
-                                                String payloadClass,
-                                                String deleteMarkerField) throws IOException {
-    HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal, deleteMarkerField);
+                                                String payloadClass) throws IOException {
+    HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
     return new HoodieRecord<>(hKey, payload);
   }
 
diff --git a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
index 0d5756f..32e5230 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
@@ -70,7 +70,7 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr
             gr, props.getString("hoodie.datasource.write.precombine.field"), false);
         try {
           return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
-              props.getString("hoodie.datasource.write.payload.class"), "_hoodie_is_deleted");
+              props.getString("hoodie.datasource.write.payload.class"));
         } catch (IOException ioe) {
           throw new HoodieIOException(ioe.getMessage(), ioe);
         }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index e529a04..8a8f87f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -185,13 +185,6 @@ object DataSourceWriteOptions {
   val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName
 
   /**
-   * Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same
-   * key value, we will check if the new record is deleted by the delete field.
-   */
-  val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field"
-  val DEFAULT_DELETE_FIELD_OPT_VAL = "_hoodie_is_deleted"
-
-  /**
     * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
     * will be obtained by invoking .toString() on the field value. Nested fields can be specified using
     * the dot notation eg: `a.b.c`
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 574c907..05ef863 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -111,9 +111,7 @@ private[hudi] object HoodieSparkSqlWriter {
         val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
             .asInstanceOf[Comparable[_]]
         DataSourceUtils.createHoodieRecord(gr,
-          orderingVal, keyGenerator.getKey(gr),
-          parameters(PAYLOAD_CLASS_OPT_KEY),
-          parameters(DELETE_FIELD_OPT_KEY))
+          orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
       }).toJavaRDD()
 
       // Handle various save modes
@@ -206,7 +204,6 @@ private[hudi] object HoodieSparkSqlWriter {
       TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
       PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
       PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
-      DELETE_FIELD_OPT_KEY -> DEFAULT_DELETE_FIELD_OPT_VAL,
       RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
       PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
       KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 29dac2b..7f26481 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -100,52 +100,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     }
   }
 
-  test("test OverwriteWithLatestAvroPayload with user defined delete field") {
-    val session = SparkSession.builder()
-      .appName("test_append_mode")
-      .master("local[2]")
-      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .getOrCreate()
-    val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
-
-    try {
-      val sqlContext = session.sqlContext
-      val hoodieFooTableName = "hoodie_foo_tbl"
-
-      val keyField = "id"
-      val deleteMarkerField = "delete_field"
-
-      //create a new table
-      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
-        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
-        "hoodie.insert.shuffle.parallelism" -> "2",
-        "hoodie.upsert.shuffle.parallelism" -> "2",
-        DELETE_FIELD_OPT_KEY -> deleteMarkerField,
-        RECORDKEY_FIELD_OPT_KEY -> keyField)
-      val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
-
-      val id1 = UUID.randomUUID().toString
-      val dataFrame = session.createDataFrame(Seq(
-        (id1, 1, false)
-      )) toDF(keyField, "ts", deleteMarkerField)
-
-      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
-      val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count
-      assert(recordCount1 == 1, "result should be 1, but get " + recordCount1)
-
-      val dataFrame2 = session.createDataFrame(Seq(
-        (id1, 2, true)
-      )) toDF(keyField, "ts", deleteMarkerField)
-      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2)
-
-      val recordCount2 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count()
-      assert(recordCount2 == 0, "result should be 0, but get " + recordCount2)
-    } finally {
-      session.stop()
-      FileUtils.deleteDirectory(path.toFile)
-    }
-  }
-
   case class Test(uuid: String, ts: Long)
 
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index ecca0d1..f0ecba0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
@@ -340,12 +339,9 @@ public class DeltaSync implements Serializable {
     }
 
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
-    String deleteMarkerField = props.getString(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP,
-        HoodieWriteConfig.DEFAULT_DELETE_MARKER_FIELD);
     JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
       HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-          (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false),
-          deleteMarkerField);
+          (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
       return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
     });
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
deleted file mode 100644
index f98eb79..0000000
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.functional;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
-import org.apache.hudi.utilities.sources.ParquetDFSSource;
-import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase {
-  private static String PARQUET_SOURCE_ROOT;
-  private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
-
-  @BeforeAll
-  public static void initClass() throws Exception {
-    UtilitiesTestBase.initClass(true);
-    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
-
-    // prepare the configs.
-    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
-    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs,
-        dfsBasePath + "/sql-transformer.properties");
-    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
-    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc");
-    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc");
-  }
-
-  @Test
-  public void testOverwriteLatestAvroPayload() throws Exception {
-    // test defaultDeleteMarkerField
-    this.testOverwriteLatestAvroPayload(null);
-
-    // test userDefinedDeleteMarkerField
-    this.testOverwriteLatestAvroPayload("user_defined_delete_marker_field");
-  }
-
-  private void testOverwriteLatestAvroPayload(String deleteMarkerField) throws Exception {
-    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
-    List<GenericRecord> records = HoodieTestDataGenerator.generateGenericRecords(5, false, 0);
-    Helpers.saveParquetToDFS(records, new Path(path));
-
-    TypedProperties parquetProps = new TypedProperties();
-    parquetProps.setProperty("include", "base.properties");
-    parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
-    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
-    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
-    if (deleteMarkerField != null) {
-      parquetProps.setProperty(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP, deleteMarkerField);
-    }
-    Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
-
-    String tableBasePath = dfsBasePath + "/test_overwrite_lastest_avro_payload_table";
-
-    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
-        TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(),
-            null, PROPS_FILENAME_TEST_PARQUET, false,
-            false, 100000, false, null, null, "timestamp"), jsc);
-    deltaStreamer.sync();
-    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + "/*/*.parquet", sqlContext);
-
-    String path2 = PARQUET_SOURCE_ROOT + "/2.parquet";
-    List<GenericRecord> records2 = HoodieTestDataGenerator.generateGenericRecords(4, true, 1);
-    Helpers.saveParquetToDFS(records2, new Path(path2));
-    deltaStreamer.sync();
-
-    List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").collectAsList();
-    assertEquals(1, rows.size());
-    assertEquals(records.get(4).get("_row_key"), rows.get(0).getString(2));
-  }
-}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
index 7ae44b7..e912573 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
@@ -56,10 +56,6 @@
     "name" : "nation",
     "type" : "bytes"
   },{
-    "name" : "user_defined_delete_marker_field",
-    "type" : "boolean",
-    "default" : false
-  },{
     "name" : "current_date",
     "type" : {
       "type" : "int",
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
index 5eaa0a7..dc735e8 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
 # limitations under the License.
 ###
 include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.user_defined_delete_marker_field, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
+hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
index 815e328..a6234f4 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
@@ -56,10 +56,6 @@
     "name" : "nation",
     "type" : "bytes"
   },{
-    "name" : "user_defined_delete_marker_field",
-    "type" : "boolean",
-    "default" : false
-  },{
     "name" : "current_date",
     "type" : {
       "type" : "int",