You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/08/10 12:23:36 UTC

[hudi] branch master updated: [HUDI-1771] Propagate CDC format for hoodie (#3285)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 21db6d7  [HUDI-1771] Propagate CDC format for hoodie (#3285)
21db6d7 is described below

commit 21db6d7a84d4a83ec98c110e92ff9c92d05dd530
Author: swuferhong <33...@qq.com>
AuthorDate: Tue Aug 10 20:23:23 2021 +0800

    [HUDI-1771] Propagate CDC format for hoodie (#3285)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  16 +++
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  15 ++-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |   4 +
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   4 +
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   4 +-
 .../apache/hudi/client/model/HoodieRowData.java    |  54 +++-----
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |   3 +
 .../io/storage/row/HoodieRowDataCreateHandle.java  |   4 +-
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   2 +-
 .../hudi/table/action/commit/FlinkWriteHelper.java |  19 +--
 .../HoodieFlinkMergeOnReadTableCompactor.java      |   4 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  48 +++++++-
 .../apache/hudi/common/model/HoodieOperation.java  | 125 +++++++++++++++++++
 .../org/apache/hudi/common/model/HoodieRecord.java |  23 ++++
 .../hudi/common/table/TableSchemaResolver.java     |  12 +-
 .../table/log/AbstractHoodieLogRecordScanner.java  |  20 ++-
 .../common/table/log/HoodieFileSliceReader.java    |   4 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |  26 ++--
 .../table/log/HoodieUnMergedLogRecordScanner.java  |  13 +-
 .../apache/hudi/common/util/SpillableMapUtils.java |  16 ++-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  10 +-
 .../HoodieMetadataMergedLogRecordScanner.java      |   2 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  11 ++
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  12 +-
 .../hudi/sink/bootstrap/BootstrapFunction.java     |   2 +-
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     |  11 +-
 .../sink/compact/CompactionPlanSourceFunction.java |  20 +--
 .../hudi/sink/compact/HoodieFlinkCompactor.java    |   3 +-
 .../sink/transform/RowDataToHoodieFunction.java    |  10 +-
 .../apache/hudi/sink/utils/HiveSyncContext.java    |   1 +
 .../apache/hudi/sink/utils/PayloadCreation.java    |   5 +-
 .../org/apache/hudi/table/HoodieTableSink.java     |  13 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  10 +-
 .../org/apache/hudi/table/format/FormatUtils.java  | 131 +++++++++++++++++++-
 .../table/format/mor/MergeOnReadInputFormat.java   | 105 ++++++++++++++--
 .../table/format/mor/MergeOnReadTableState.java    |   8 ++
 .../java/org/apache/hudi/util/ChangelogModes.java  |  46 +++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    |   1 +
 .../org/apache/hudi/sink/StreamWriteITCase.java    |  12 +-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  47 ++++---
 .../apache/hudi/sink/bulk/TestRowDataKeyGen.java   |   4 +-
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  29 +++--
 .../apache/hudi/table/format/TestInputFormat.java  | 122 ++++++++++++++++--
 .../test/java/org/apache/hudi/utils/TestData.java  |  85 ++++++++++++-
 .../org/apache/hudi/utils/TestHoodieRowData.java   | 136 +++++++++++++++++++++
 .../utils/factory/CollectSinkTableFactory.java     |   1 +
 .../java/org/apache/hudi/dla/HoodieDLAClient.java  |   2 +-
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |   5 +
 .../org/apache/hudi/hive/HoodieHiveClient.java     |   2 +-
 .../hudi/sync/common/AbstractSyncHoodieClient.java |  18 ++-
 50 files changed, 1081 insertions(+), 199 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7e9c0cc..9616b55 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -373,6 +373,13 @@ public class HoodieWriteConfig extends HoodieConfig {
        .withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. "
           + "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data");
 
+  public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = ConfigProperty
+      .key("hoodie.allow.operation.metadata.field")
+      .defaultValue(false)
+      .sinceVersion("0.9")
+      .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. "
+          + "Once enabled, all the changes of a record are persisted to the delta log directly without merge");
+
   private ConsistencyGuardConfig consistencyGuardConfig;
 
   // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -1309,6 +1316,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
   }
 
+  public boolean allowOperationMetadataField() {
+    return getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -1615,6 +1626,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAllowOperationMetadataField(boolean allowOperationMetadataField) {
+      writeConfig.setValue(ALLOW_OPERATION_METADATA_FIELD, Boolean.toString(allowOperationMetadataField));
+      return this;
+    }
+
     public Builder withProperties(Properties properties) {
       this.writeConfig.getProps().putAll(properties);
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index fe45d4b..1315c99 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -197,20 +198,26 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
       // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
       // Whether it is a update or insert record.
       boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
+      // If the format can not record the operation field, nullify the DELETE payload manually.
+      boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
       recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
-      Option<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
+      Option<IndexedRecord> avroRecord = nullifyPayload ? Option.empty() : hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
       if (avroRecord.isPresent()) {
         if (avroRecord.get().equals(IGNORE_RECORD)) {
           return avroRecord;
         }
         // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
-        avroRecord = Option.of(rewriteRecord((GenericRecord) avroRecord.get()));
+        GenericRecord rewriteRecord = rewriteRecord((GenericRecord) avroRecord.get());
+        avroRecord = Option.of(rewriteRecord);
         String seqId =
             HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
         if (config.populateMetaFields()) {
-          HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(),
+          HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, hoodieRecord.getRecordKey(),
               hoodieRecord.getPartitionPath(), fileId);
-          HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), instantTime, seqId);
+          HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId);
+        }
+        if (config.allowOperationMetadataField()) {
+          HoodieAvroUtils.addOperationToRecord(rewriteRecord, hoodieRecord.getOperation());
         }
         if (isUpdateRecord(hoodieRecord)) {
           updatedRecordsWritten++;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index eef5b3d..01ad453 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -22,6 +22,7 @@ import org.apache.avro.Schema;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -127,6 +128,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
   @Override
   public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
     Option recordMetadata = record.getData().getMetadata();
+    if (HoodieOperation.isDelete(record.getOperation())) {
+      avroRecord = Option.empty();
+    }
     try {
       if (avroRecord.isPresent()) {
         if (avroRecord.get().equals(IGNORE_RECORD)) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 347f8cc..3e20141 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -264,6 +265,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
       writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
       return false;
     }
+    if (HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+      indexedRecord = Option.empty();
+    }
     try {
       if (indexedRecord.isPresent()) {
         // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 0337b0e..306021b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -112,9 +112,9 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
     this.partitionPath = partitionPath;
     this.fileId = fileId;
     this.tableSchema = overriddenSchema.orElseGet(() -> getSpecifiedTableSchema(config));
-    this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema);
+    this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema, config.allowOperationMetadataField());
     this.writeSchema = overriddenSchema.orElseGet(() -> getWriteSchema(config));
-    this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema);
+    this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField());
     this.timer = new HoodieTimer().startTimer();
     this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(),
         !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
index 86acc1c..dfc425c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.client.model;
 
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieOperation;
 
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
@@ -35,12 +35,7 @@ import org.apache.flink.types.RowKind;
  * copy rather than fetching from {@link RowData}.
  */
 public class HoodieRowData implements RowData {
-
-  private final String commitTime;
-  private final String commitSeqNumber;
-  private final String recordKey;
-  private final String partitionPath;
-  private final String fileName;
+  private final String[] metaColumns;
   private final RowData row;
   private final int metaColumnsNum;
 
@@ -49,14 +44,19 @@ public class HoodieRowData implements RowData {
                        String recordKey,
                        String partitionPath,
                        String fileName,
-                       RowData row) {
-    this.commitTime = commitTime;
-    this.commitSeqNumber = commitSeqNumber;
-    this.recordKey = recordKey;
-    this.partitionPath = partitionPath;
-    this.fileName = fileName;
+                       RowData row,
+                       boolean withOperation) {
+    this.metaColumnsNum = withOperation ? 6 : 5;
+    this.metaColumns = new String[metaColumnsNum];
+    metaColumns[0] = commitTime;
+    metaColumns[1] = commitSeqNumber;
+    metaColumns[2] = recordKey;
+    metaColumns[3] = partitionPath;
+    metaColumns[4] = fileName;
+    if (withOperation) {
+      metaColumns[5] = HoodieOperation.fromValue(row.getRowKind().toByteValue()).getName();
+    }
     this.row = row;
-    this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
   }
 
   @Override
@@ -74,28 +74,6 @@ public class HoodieRowData implements RowData {
     this.row.setRowKind(kind);
   }
 
-  private String getMetaColumnVal(int ordinal) {
-    switch (ordinal) {
-      case 0: {
-        return commitTime;
-      }
-      case 1: {
-        return commitSeqNumber;
-      }
-      case 2: {
-        return recordKey;
-      }
-      case 3: {
-        return partitionPath;
-      }
-      case 4: {
-        return fileName;
-      }
-      default:
-        throw new IllegalArgumentException("Not expected");
-    }
-  }
-
   @Override
   public boolean isNullAt(int ordinal) {
     if (ordinal < metaColumnsNum) {
@@ -181,4 +159,8 @@ public class HoodieRowData implements RowData {
   public MapData getMap(int ordinal) {
     return row.getMap(ordinal - metaColumnsNum);
   }
+
+  private String getMetaColumnVal(int ordinal) {
+    return this.metaColumns[ordinal];
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 987f335..81bd598 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -79,6 +79,9 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
 
   @Override
   protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
+    // do not use the HoodieRecord operation because hoodie writer has its own
+    // INSERT/MERGE bucket for 'UPSERT' semantics. For e.g, a hoodie record with fresh new key
+    // and operation HoodieCdcOperation.DELETE would be put into either an INSERT bucket or UPDATE bucket.
     return hoodieRecord.getCurrentLocation() != null
         && hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
   }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 76fad8b..238367b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -117,7 +117,7 @@ public class HoodieRowDataCreateHandle implements Serializable {
     try {
       String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
       HoodieRowData rowData = new HoodieRowData(instantTime, seqId, recordKey, partitionPath, path.getName(),
-          record);
+          record, writeConfig.allowOperationMetadataField());
       try {
         fileWriter.writeRow(recordKey, rowData);
         writeStatus.markSuccess(recordKey);
@@ -131,7 +131,7 @@ public class HoodieRowDataCreateHandle implements Serializable {
   }
 
   /**
-   * @returns {@code true} if this handle can take in more writes. else {@code false}.
+   * Returns {@code true} if this handle can take in more writes. else {@code false}.
    */
   public boolean canWrite() {
     return fileWriter.canWrite();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 9776870..8a9b4bf 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -354,7 +354,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
           dataFileToBeMerged, taskContextSupplier, Option.empty());
     } else {
       return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
-          dataFileToBeMerged,taskContextSupplier, Option.empty());
+          dataFileToBeMerged, taskContextSupplier, Option.empty());
     }
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 5238123..5cb1b80 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.Pair;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
  * <p>Computing the records batch locations all at a time is a pressure to the engine,
  * we should avoid that in streaming system.
  */
-public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
+public class FlinkWriteHelper<T extends HoodieRecordPayload, R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
     List<HoodieKey>, List<WriteStatus>, R> {
 
   private FlinkWriteHelper() {
@@ -80,8 +81,8 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
 
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> records,
-                                                     HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
-                                                     int parallelism) {
+                                                  HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
+                                                  int parallelism) {
     Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
       // If index used is global, then records are expected to differ in their partitionPath
       final Object key = record.getKey().getRecordKey();
@@ -89,13 +90,17 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
     }).collect(Collectors.groupingBy(Pair::getLeft));
 
     return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
-      @SuppressWarnings("unchecked")
-      T reducedData = (T) rec1.getData().preCombine(rec2.getData());
+      final T data1 = rec1.getData();
+      final T data2 = rec2.getData();
+
+      @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1);
       // we cannot allow the user to change the key or partitionPath, since that will affect
       // everything
       // so pick it from one of the records.
-      HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
-      HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, reducedData);
+      boolean choosePrev = data1.equals(reducedData);
+      HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
+      HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
+      HoodieRecord<T> hoodieRecord = new HoodieRecord<>(reducedKey, reducedData, operation);
       // reuse the location from the first record.
       hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
       return hoodieRecord;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index f08c8b5..1f4a524 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -86,7 +86,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
 
   @Override
   public List<WriteStatus> compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
-                                      HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
+                                   HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException {
     throw new UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not support compact directly, "
         + "the function works as a separate pipeline");
   }
@@ -98,7 +98,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
                                    String instantTime) throws IOException {
     FileSystem fs = metaClient.getFs();
 
-    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
+    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
     LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
         + " for commit " + instantTime);
     // TODO - FIX THIS
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 695f97e..239eed5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -19,7 +19,10 @@
 package org.apache.hudi.avro;
 
 import org.apache.avro.specific.SpecificRecordBase;
+
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -151,7 +154,8 @@ public class HoodieAvroUtils {
         || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
         || HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
         || HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
-        || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
+        || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName)
+        || HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName);
   }
 
   public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -164,8 +168,20 @@ public class HoodieAvroUtils {
 
   /**
    * Adds the Hoodie metadata fields to the given schema.
+   *
+   * @param schema The schema
    */
   public static Schema addMetadataFields(Schema schema) {
+    return addMetadataFields(schema, false);
+  }
+
+  /**
+   * Adds the Hoodie metadata fields to the given schema.
+   *
+   * @param schema The schema
+   * @param withOperationField Whether to include the '_hoodie_operation' field
+   */
+  public static Schema addMetadataFields(Schema schema, boolean withOperationField) {
     List<Schema.Field> parentFields = new ArrayList<>();
 
     Schema.Field commitTimeField =
@@ -184,6 +200,13 @@ public class HoodieAvroUtils {
     parentFields.add(recordKeyField);
     parentFields.add(partitionPathField);
     parentFields.add(fileNameField);
+
+    if (withOperationField) {
+      final Schema.Field operationField =
+          new Schema.Field(HoodieRecord.OPERATION_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
+      parentFields.add(operationField);
+    }
+
     for (Schema.Field field : schema.getFields()) {
       if (!isMetadataField(field.name())) {
         Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
@@ -202,7 +225,7 @@ public class HoodieAvroUtils {
   public static Schema removeMetadataFields(Schema schema) {
     List<Schema.Field> filteredFields = schema.getFields()
                                               .stream()
-                                              .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name()))
+                                              .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name()))
                                               .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
                                               .collect(Collectors.toList());
     Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
@@ -268,6 +291,11 @@ public class HoodieAvroUtils {
     return record;
   }
 
+  public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOperation operation) {
+    record.put(HoodieRecord.OPERATION_METADATA_FIELD, operation.getName());
+    return record;
+  }
+
   /**
    * Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. As different query
    * engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller
@@ -454,6 +482,22 @@ public class HoodieAvroUtils {
   }
 
   /**
+   * Returns the string value of the given record {@code rec} and field {@code fieldName}.
+   * The field and value both could be missing.
+   *
+   * @param rec The record
+   * @param fieldName The field name
+   *
+   * @return the string form of the field
+   * or empty if the schema does not contain the field name or the value is null
+   */
+  public static Option<String> getNullableValAsString(GenericRecord rec, String fieldName) {
+    Schema.Field field = rec.getSchema().getField(fieldName);
+    String fieldVal = field == null ? null : StringUtils.objToString(rec.get(field.pos()));
+    return Option.ofNullable(fieldVal);
+  }
+
+  /**
    * This method converts values for fields with certain Avro/Parquet data types that require special handling.
    *
    * @param fieldSchema avro field schema
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java
new file mode 100644
index 0000000..0da40eb
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieOperation.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Represents the changes that a row can describe in a changelog.
+ */
+public enum HoodieOperation {
+  /**
+   * Insert operation.
+   */
+  INSERT("I", (byte) 0),
+  /**
+   * Update operation with previous record content,
+   * should be used together with {@link #UPDATE_AFTER} for modeling an update operation.
+   */
+  UPDATE_BEFORE("-U", (byte) 1),
+  /**
+   * Update operation with new record content.
+   */
+  UPDATE_AFTER("U", (byte) 2),
+  /**
+   * Delete operation.
+   */
+  DELETE("D", (byte) 4);
+
+  private final String name;
+  private final byte value;
+
+  HoodieOperation(String name, byte value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public byte getValue() {
+    return value;
+  }
+
+  public static HoodieOperation fromValue(byte value) {
+    switch (value) {
+      case 0:
+        return INSERT;
+      case 1:
+        return UPDATE_BEFORE;
+      case 2:
+        return UPDATE_AFTER;
+      case 3:
+        return DELETE;
+      default:
+        throw new AssertionError();
+    }
+  }
+
+  public static HoodieOperation fromName(Option<String> nameOpt) {
+    if (!nameOpt.isPresent()) {
+      return null;
+    }
+    return fromName(nameOpt.get());
+  }
+
+  public static HoodieOperation fromName(String name) {
+    switch (name) {
+      case "I":
+        return INSERT;
+      case "-U":
+        return UPDATE_BEFORE;
+      case "U":
+        return UPDATE_AFTER;
+      case "D":
+        return DELETE;
+      default:
+        throw new AssertionError();
+    }
+  }
+
+  /**
+   * Returns whether the operation is INSERT.
+   */
+  public static boolean isInsert(HoodieOperation operation) {
+    return operation == INSERT;
+  }
+
+  /**
+   * Returns whether the operation is UPDATE_BEFORE.
+   */
+  public static boolean isUpdateBefore(HoodieOperation operation) {
+    return operation == UPDATE_BEFORE;
+  }
+
+  /**
+   * Returns whether the operation is UPDATE_AFTER.
+   */
+  public static boolean isUpdateAfter(HoodieOperation operation) {
+    return operation == UPDATE_AFTER;
+  }
+
+  /**
+   * Returns whether the operation is DELETE.
+   */
+  public static boolean isDelete(HoodieOperation operation) {
+    return operation == DELETE;
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 6484c5c..1742778 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -39,11 +39,19 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
   public static final String RECORD_KEY_METADATA_FIELD = "_hoodie_record_key";
   public static final String PARTITION_PATH_METADATA_FIELD = "_hoodie_partition_path";
   public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
+  public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
 
   public static final List<String> HOODIE_META_COLUMNS =
       CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
           RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD);
 
+  // Temporary to support the '_hoodie_operation' field, once we solve
+  // the compatibility problem, it can be removed.
+  public static final List<String> HOODIE_META_COLUMNS_WITH_OPERATION =
+      CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
+          RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD,
+          OPERATION_METADATA_FIELD);
+
   public static final Map<String, Integer> HOODIE_META_COLUMNS_NAME_TO_POS =
       IntStream.range(0, HOODIE_META_COLUMNS.size()).mapToObj(idx -> Pair.of(HOODIE_META_COLUMNS.get(idx), idx))
           .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -73,12 +81,22 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
    */
   private boolean sealed;
 
+  /**
+   * The cdc operation.
+   */
+  private HoodieOperation operation;
+
   public HoodieRecord(HoodieKey key, T data) {
+    this(key, data, null);
+  }
+
+  public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) {
     this.key = key;
     this.data = data;
     this.currentLocation = null;
     this.newLocation = null;
     this.sealed = false;
+    this.operation = operation;
   }
 
   public HoodieRecord(HoodieRecord<T> record) {
@@ -86,6 +104,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
     this.currentLocation = record.currentLocation;
     this.newLocation = record.newLocation;
     this.sealed = record.sealed;
+    this.operation = record.operation;
   }
 
   public HoodieRecord() {
@@ -95,6 +114,10 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
     return key;
   }
 
+  public HoodieOperation getOperation() {
+    return operation;
+  }
+
   public T getData() {
     if (data == null) {
       throw new IllegalStateException("Payload already deflated for record.");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 20e5a82..8d16e91 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -55,10 +55,16 @@ import org.apache.parquet.schema.MessageType;
 public class TableSchemaResolver {
 
   private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
-  private HoodieTableMetaClient metaClient;
+  private final HoodieTableMetaClient metaClient;
+  private final boolean withOperationField;
 
   public TableSchemaResolver(HoodieTableMetaClient metaClient) {
+    this(metaClient, false);
+  }
+
+  public TableSchemaResolver(HoodieTableMetaClient metaClient, boolean withOperationField) {
     this.metaClient = metaClient;
+    this.withOperationField = withOperationField;
   }
 
   /**
@@ -170,7 +176,7 @@ public class TableSchemaResolver {
     Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
     if (schemaFromTableConfig.isPresent()) {
       if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get());
+        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField);
       } else {
         return schemaFromTableConfig.get();
       }
@@ -256,7 +262,7 @@ public class TableSchemaResolver {
 
       Schema schema = new Schema.Parser().parse(existingSchemaStr);
       if (includeMetadataFields) {
-        schema = HoodieAvroUtils.addMetadataFields(schema);
+        schema = HoodieAvroUtils.addMetadataFields(schema, withOperationField);
       }
       return Option.of(schema);
     } catch (Exception e) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 95ed80a..79d5821 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -96,6 +96,8 @@ public abstract class AbstractHoodieLogRecordScanner {
   private final int bufferSize;
   // optional instant range for incremental block filtering
   private final Option<InstantRange> instantRange;
+  // Read the operation metadata field from the avro record
+  private final boolean withOperationField;
   // FileSystem
   private final FileSystem fs;
   // Total log files read - for metrics
@@ -114,7 +116,8 @@ public abstract class AbstractHoodieLogRecordScanner {
   private float progress = 0.0f;
 
   protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
-      String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange) {
+                                           String latestInstantTime, boolean readBlocksLazily, boolean reverseReader,
+                                           int bufferSize, Option<InstantRange> instantRange, boolean withOperationField) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
@@ -131,6 +134,7 @@ public abstract class AbstractHoodieLogRecordScanner {
     this.fs = fs;
     this.bufferSize = bufferSize;
     this.instantRange = instantRange;
+    this.withOperationField = withOperationField;
   }
 
   /**
@@ -294,7 +298,7 @@ public abstract class AbstractHoodieLogRecordScanner {
   private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
     return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK
         && !logBlock.getLogBlockHeader().get(INSTANT_TIME)
-            .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
+        .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
   }
 
   /**
@@ -312,9 +316,9 @@ public abstract class AbstractHoodieLogRecordScanner {
 
   protected HoodieRecord<?> createHoodieRecord(IndexedRecord rec) {
     if (!simpleKeyGenFields.isPresent()) {
-      return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN);
+      return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.withOperationField);
     } else {
-      return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get());
+      return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get(), this.withOperationField);
     }
   }
 
@@ -392,6 +396,10 @@ public abstract class AbstractHoodieLogRecordScanner {
     return totalCorruptBlocks.get();
   }
 
+  public boolean isWithOperationField() {
+    return withOperationField;
+  }
+
   /**
    * Builder used to build {@code AbstractHoodieLogRecordScanner}.
    */
@@ -417,6 +425,10 @@ public abstract class AbstractHoodieLogRecordScanner {
       throw new UnsupportedOperationException();
     }
 
+    public Builder withOperationField(boolean withOperationField) {
+      throw new UnsupportedOperationException();
+    }
+
     public abstract AbstractHoodieLogRecordScanner build();
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
index 76d1cc3..d840565 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java
@@ -45,8 +45,8 @@ public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Ite
     while (baseIterator.hasNext()) {
       GenericRecord record = (GenericRecord) baseIterator.next();
       HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = simpleKeyGenFieldsOpt.isPresent()
-          ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get())
-          : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass);
+          ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
+          : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, scanner.isWithOperationField());
       scanner.processNextRecord(hoodieRecord);
     }
     return new HoodieFileSliceReader(scanner.iterator());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 4d51fbb..bc08fe7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -43,7 +43,7 @@ import java.util.Map;
 /**
  * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of records which will
  * be used as a lookup table when merging the base columnar file with the redo log file.
- *
+ * <p>
  * NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once
  * This is an optimization to avoid seek() back and forth to read new block (forward seek()) and lazily read content of
  * seen block (reverse and forward seek()) during merge | | Read Block 1 Metadata | | Read Block 1 Data | | | Read Block
@@ -72,11 +72,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
 
   @SuppressWarnings("unchecked")
   protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
-                                      String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
-                                      boolean reverseReader, int bufferSize, String spillableMapBasePath,
-                                      Option<InstantRange> instantRange, boolean autoScan,
-                                         ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange);
+                                         String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
+                                         boolean reverseReader, int bufferSize, String spillableMapBasePath,
+                                         Option<InstantRange> instantRange, boolean autoScan,
+                                         ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
+                                         boolean withOperationField) {
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField);
     try {
       // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
@@ -132,8 +133,10 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
     if (records.containsKey(key)) {
       // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
       // done when a delete (empty payload) is encountered before or after an insert/update.
+
+      // Always use the natural order now.
       HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData());
-      records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
+      records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, hoodieRecord.getOperation()));
     } else {
       // Put the record as is
       records.put(key, hoodieRecord);
@@ -177,6 +180,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
     private Option<InstantRange> instantRange = Option.empty();
     // auto scan default true
     private boolean autoScan = true;
+    // operation field default false
+    private boolean withOperationField = false;
 
     public Builder withFileSystem(FileSystem fs) {
       this.fs = fs;
@@ -248,12 +253,17 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
       return this;
     }
 
+    public Builder withOperationField(boolean withOperationField) {
+      this.withOperationField = withOperationField;
+      return this;
+    }
+
     @Override
     public HoodieMergedLogRecordScanner build() {
       return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
           latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
           bufferSize, spillableMapBasePath, instantRange, autoScan,
-          diskMapType, isBitCaskDiskMapCompressionEnabled);
+          diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField);
     }
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index dd0edd9..8b26f72 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -36,8 +36,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
   private final LogRecordScannerCallback callback;
 
   private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
-      String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, Option.empty());
+                                         String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize,
+                                         LogRecordScannerCallback callback, Option<InstantRange> instantRange) {
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, false);
     this.callback = callback;
   }
 
@@ -80,6 +81,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
     private boolean readBlocksLazily;
     private boolean reverseReader;
     private int bufferSize;
+    private Option<InstantRange> instantRange = Option.empty();
     // specific configurations
     private LogRecordScannerCallback callback;
 
@@ -123,6 +125,11 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
       return this;
     }
 
+    public Builder withInstantRange(Option<InstantRange> instantRange) {
+      this.instantRange = instantRange;
+      return this;
+    }
+
     public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
       this.callback = callback;
       return this;
@@ -131,7 +138,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
     @Override
     public HoodieUnMergedLogRecordScanner build() {
       return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
-          latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback);
+          latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange);
     }
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
index 43f8ba5..0a716e0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
 
 import org.apache.hudi.common.fs.SizeAwareDataOutputStream;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry;
@@ -32,6 +33,8 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.zip.CRC32;
 
+import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString;
+
 /**
  * A utility class supports spillable map.
  */
@@ -110,18 +113,23 @@ public class SpillableMapUtils {
   /**
    * Utility method to convert bytes to HoodieRecord using schema and payload class.
    */
-  public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) {
-    return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+  public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, boolean withOperationField) {
+    return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField);
   }
 
   /**
    * Utility method to convert bytes to HoodieRecord using schema and payload class.
    */
-  public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, Pair<String, String> recordKeyPartitionPathPair) {
+  public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz,
+                                                   Pair<String, String> recordKeyPartitionPathPair,
+                                                   boolean withOperationField) {
     String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString();
     String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString();
+    HoodieOperation operation = withOperationField
+        ? HoodieOperation.fromName(getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
     HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath),
-        ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class));
+        ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class), operation);
+
     return (R) hoodieRecord;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 8995ab4..962b8ca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -132,10 +132,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
         HoodieTimer readTimer = new HoodieTimer().startTimer();
         Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
         if (baseRecord.isPresent()) {
-          hoodieRecord = tableConfig.populateMetaFields() ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
-              tableConfig.getPayloadClass()) : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
-              tableConfig.getPayloadClass(), Pair.of(tableConfig.getRecordKeyFieldProp(),
-              tableConfig.getPartitionFieldProp()));
+          hoodieRecord = tableConfig.populateMetaFields()
+              ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), false)
+              : SpillableMapUtils.convertToHoodieRecordPayload(
+                  baseRecord.get(),
+                  tableConfig.getPayloadClass(),
+                  Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()), false);
           metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
         }
       }
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
index 51b0315..a3c3e08 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
@@ -45,7 +45,7 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
                                               String spillableMapBasePath, Set<String> mergeKeyFilter,
                                                ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
-        spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled);
+        spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled, false);
     this.mergeKeyFilter = mergeKeyFilter;
 
     performScan();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c0d721a..21fdb84 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -76,6 +76,17 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("The default partition name in case the dynamic partition"
           + " column value is null/empty string");
 
+  public static final ConfigOption<Boolean> CHANGELOG_ENABLED = ConfigOptions
+      .key("changelog.enabled")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Whether to keep all the intermediate changes, "
+          + "we try to keep all the changes of a record when enabled:\n"
+          + "1). The sink accept the UPDATE_BEFORE message;\n"
+          + "2). The source try to emit every changes of a record.\n"
+          + "The semantics is best effort because the compaction job would finally merge all changes of a record into one.\n"
+          + " default false to have UPSERT semantics");
+
   // ------------------------------------------------------------------------
   //  Metadata table Options
   // ------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 375265a..71b20ba 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -360,23 +361,26 @@ public class StreamWriteFunction<K, I, O>
     private final String key; // record key
     private final String instant; // 'U' or 'I'
     private final HoodieRecordPayload<?> data; // record payload
+    private final HoodieOperation operation; // operation
 
-    private DataItem(String key, String instant, HoodieRecordPayload<?> data) {
+    private DataItem(String key, String instant, HoodieRecordPayload<?> data, HoodieOperation operation) {
       this.key = key;
       this.instant = instant;
       this.data = data;
+      this.operation = operation;
     }
 
     public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
       return new DataItem(
           record.getRecordKey(),
           record.getCurrentLocation().getInstantTime(),
-          record.getData());
+          record.getData(),
+          record.getOperation());
     }
 
     public HoodieRecord<?> toHoodieRecord(String partitionPath) {
       HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
-      HoodieRecord<?> record = new HoodieRecord<>(hoodieKey, data);
+      HoodieRecord<?> record = new HoodieRecord<>(hoodieKey, data, operation);
       HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
       record.setCurrentLocation(loc);
       return record;
@@ -417,7 +421,7 @@ public class StreamWriteFunction<K, I, O>
     public void preWrite(List<HoodieRecord> records) {
       // rewrite the first record with expected fileID
       HoodieRecord<?> first = records.get(0);
-      HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData());
+      HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData(), first.getOperation());
       HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
       record.setCurrentLocation(newLoc);
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index bcf6c3a..4923362 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -203,7 +203,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
                 .filter(logFile -> isValidFile(logFile.getFileStatus()))
                 .map(logFile -> logFile.getPath().toString())
                 .collect(toList());
-        HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(logPaths, schema, latestCommitTime.get().getTimestamp(),
+        HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
             writeConfig, hadoopConf);
 
         try {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 0e3ecb1..fbe7678 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -71,7 +71,7 @@ public class BulkInsertWriterHelper {
     this.taskPartitionId = taskPartitionId;
     this.taskId = taskId;
     this.taskEpochId = taskEpochId;
-    this.rowType = addMetadataFields(rowType); // patch up with metadata fields
+    this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
     this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
     this.fileIdPrefix = UUID.randomUUID().toString();
     this.keyGen = RowDataKeyGen.instance(conf, rowType);
@@ -141,7 +141,7 @@ public class BulkInsertWriterHelper {
   /**
    * Adds the Hoodie metadata fields to the given row type.
    */
-  private static RowType addMetadataFields(RowType rowType) {
+  private static RowType addMetadataFields(RowType rowType, boolean withOperationField) {
     List<RowType.RowField> mergedFields = new ArrayList<>();
 
     LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
@@ -161,6 +161,13 @@ public class BulkInsertWriterHelper {
     mergedFields.add(recordKeyField);
     mergedFields.add(partitionPathField);
     mergedFields.add(fileNameField);
+
+    if (withOperationField) {
+      RowType.RowField operationField =
+          new RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType, "operation");
+      mergedFields.add(operationField);
+    }
+
     mergedFields.addAll(rowType.getFields());
 
     return new RowType(false, mergedFields);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
index ed916c8..fe55089 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java
@@ -20,8 +20,6 @@ package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
@@ -57,26 +55,14 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement
   /**
    * Compaction instant time.
    */
-  private String compactionInstantTime;
-
-  /**
-   * Hoodie flink table.
-   */
-  private HoodieFlinkTable<?> table;
+  private final String compactionInstantTime;
 
   /**
    * The compaction plan.
    */
-  private HoodieCompactionPlan compactionPlan;
-
-  /**
-   * Hoodie instant.
-   */
-  private HoodieInstant instant;
+  private final HoodieCompactionPlan compactionPlan;
 
-  public CompactionPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
-    this.table = table;
-    this.instant = instant;
+  public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
     this.compactionPlan = compactionPlan;
     this.compactionInstantTime = compactionInstantTime;
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 84b6813..18d49f1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -133,9 +133,8 @@ public class HoodieFlinkCompactor {
 
     // Mark instant as compaction inflight
     table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
-    table.getMetaClient().reloadActiveTimeline();
 
-    env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
+    env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
         .name("compaction_source")
         .uid("uid_compaction_source")
         .rebalance()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index f575713..b600a5d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.sink.transform;
 
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -34,7 +35,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
 
@@ -108,9 +108,9 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord>
   private HoodieRecord toHoodieRecord(I record) throws Exception {
     GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
     final HoodieKey hoodieKey = keyGenerator.getKey(gr);
-    // nullify the payload insert data to mark the record as a DELETE
-    final boolean isDelete = record.getRowKind() == RowKind.DELETE;
-    HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
-    return new HoodieRecord<>(hoodieKey, payload);
+
+    HoodieRecordPayload payload = payloadCreation.createPayload(gr);
+    HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue());
+    return new HoodieRecord<>(hoodieKey, payload, operation);
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index 12eb039..192feb5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -86,6 +86,7 @@ public class HiveSyncContext {
     hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
     hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
     hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
+    hiveSyncConfig.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
     return hiveSyncConfig;
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
index 831da25..20070b3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
@@ -71,13 +71,12 @@ public class PayloadCreation implements Serializable {
     return new PayloadCreation(shouldCombine, constructor, preCombineField);
   }
 
-  public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean isDelete) throws Exception {
+  public HoodieRecordPayload<?> createPayload(GenericRecord record) throws Exception {
     if (shouldCombine) {
       ValidationUtils.checkState(preCombineField != null);
       Comparable<?> orderingVal = (Comparable<?>) HoodieAvroUtils.getNestedFieldVal(record,
           preCombineField, false);
-      return (HoodieRecordPayload<?>) constructor.newInstance(
-          isDelete ? null : record, orderingVal);
+      return (HoodieRecordPayload<?>) constructor.newInstance(record, orderingVal);
     } else {
       return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.of(record));
     }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 49cd9e7..8a16c6d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -34,6 +34,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
 import org.apache.hudi.sink.compact.CompactionPlanOperator;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignOperator;
+import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.util.StreamerUtil;
@@ -52,7 +53,6 @@ import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
 
 import java.util.Map;
 
@@ -185,12 +185,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
 
   @Override
   public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
-    // ignore RowKind.UPDATE_BEFORE
-    return ChangelogMode.newBuilder()
-        .addContainedKind(RowKind.DELETE)
-        .addContainedKind(RowKind.INSERT)
-        .addContainedKind(RowKind.UPDATE_AFTER)
-        .build();
+    if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+      return ChangelogModes.FULL;
+    } else {
+      return ChangelogModes.UPSERT;
+    }
   }
 
   @Override
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index af2327d..51d3c8d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -37,6 +37,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
 import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
@@ -196,7 +197,12 @@ public class HoodieTableSource implements
 
   @Override
   public ChangelogMode getChangelogMode() {
-    return ChangelogMode.insertOnly();
+    return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+        && !conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
+        ? ChangelogModes.FULL
+        // when all the changes are persisted or read as batch,
+        // use INSERT mode.
+        : ChangelogMode.insertOnly();
   }
 
   @Override
@@ -309,7 +315,7 @@ public class HoodieTableSource implements
       return new CollectionInputFormat<>(Collections.emptyList(), null);
     }
 
-    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
     final Schema tableAvroSchema;
     try {
       tableAvroSchema = schemaUtil.getTableAvroSchema();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index cbf1ea7..14f7eb3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -19,19 +19,32 @@
 package org.apache.hudi.table.format;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +58,39 @@ public class FormatUtils {
   private FormatUtils() {
   }
 
+  /**
+   * Sets up the row kind to the row data {@code rowData} from the resolved operation.
+   */
+  public static void setRowKind(RowData rowData, IndexedRecord record, int index) {
+    if (index == -1) {
+      return;
+    }
+    rowData.setRowKind(getRowKind(record, index));
+  }
+
+  /**
+   * Returns the RowKind of the given record, never null.
+   * Returns RowKind.INSERT when the given field value not found.
+   */
+  private static RowKind getRowKind(IndexedRecord record, int index) {
+    Object val = record.get(index);
+    if (val == null) {
+      return RowKind.INSERT;
+    }
+    final HoodieOperation operation = HoodieOperation.fromName(val.toString());
+    if (HoodieOperation.isInsert(operation)) {
+      return RowKind.INSERT;
+    } else if (HoodieOperation.isUpdateBefore(operation)) {
+      return RowKind.UPDATE_BEFORE;
+    } else if (HoodieOperation.isUpdateAfter(operation)) {
+      return RowKind.UPDATE_AFTER;
+    } else if (HoodieOperation.isDelete(operation)) {
+      return RowKind.DELETE;
+    } else {
+      throw new AssertionError();
+    }
+  }
+
   public static GenericRecord buildAvroRecordBySchema(
       IndexedRecord record,
       Schema requiredSchema,
@@ -57,10 +103,11 @@ public class FormatUtils {
     return recordBuilder.build();
   }
 
-  public static HoodieMergedLogRecordScanner scanLog(
+  public static HoodieMergedLogRecordScanner logScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
-      Configuration config) {
+      Configuration config,
+      boolean withOperationField) {
     FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
     return HoodieMergedLogRecordScanner.newBuilder()
         .withFileSystem(fs)
@@ -81,10 +128,88 @@ public class FormatUtils {
             config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
                 HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
         .withInstantRange(split.getInstantRange())
+        .withOperationField(withOperationField)
+        .build();
+  }
+
+  private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
+      MergeOnReadInputSplit split,
+      Schema logSchema,
+      Configuration config,
+      HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
+    FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
+    return HoodieUnMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(split.getTablePath())
+        .withLogFilePaths(split.getLogPaths().get())
+        .withReaderSchema(logSchema)
+        .withLatestInstantTime(split.getLatestCommit())
+        .withReadBlocksLazily(
+            string2Boolean(
+                config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+                    HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+        .withReverseReader(false)
+        .withBufferSize(
+            config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+                HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withInstantRange(split.getInstantRange())
+        .withLogRecordScannerCallback(callback)
         .build();
   }
 
-  public static HoodieMergedLogRecordScanner scanLog(
+  /**
+   * Utility to read and buffer the records in the unMerged log record scanner.
+   */
+  public static class BoundedMemoryRecords {
+    // Log Record unmerged scanner
+    private final HoodieUnMergedLogRecordScanner scanner;
+
+    // Executor that runs the above producers in parallel
+    private final BoundedInMemoryExecutor<HoodieRecord<?>, HoodieRecord<?>, ?> executor;
+
+    // Iterator for the buffer consumer
+    private final Iterator<HoodieRecord<?>> iterator;
+
+    public BoundedMemoryRecords(
+        MergeOnReadInputSplit split,
+        Schema logSchema,
+        Configuration hadoopConf) {
+      this.executor = new BoundedInMemoryExecutor<>(
+          HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)),
+          getParallelProducers(),
+          Option.empty(),
+          x -> x,
+          new DefaultSizeEstimator<>());
+      // Consumer of this record reader
+      this.iterator = this.executor.getQueue().iterator();
+      this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf,
+          record -> executor.getQueue().insertRecord(record));
+      // Start reading and buffering
+      this.executor.startProducers();
+    }
+
+    public Iterator<HoodieRecord<?>> getRecordsIterator() {
+      return this.iterator;
+    }
+
+    /**
+     * Setup log and parquet reading in parallel. Both write to central buffer.
+     */
+    private List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> getParallelProducers() {
+      List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> producers = new ArrayList<>();
+      producers.add(new FunctionBasedQueueProducer<>(buffer -> {
+        scanner.scan();
+        return null;
+      }));
+      return producers;
+    }
+
+    public void close() {
+      this.executor.shutdownNow();
+    }
+  }
+
+  public static HoodieMergedLogRecordScanner logScanner(
       List<String> logPaths,
       Schema logSchema,
       String latestInstantTime,
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 4d68242..d0e1c33d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format.mor;
 
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
@@ -176,7 +177,11 @@ public class MergeOnReadInputFormat
       }
     } else if (!split.getBasePath().isPresent()) {
       // log files only
-      this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
+      if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+        this.iterator = new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+      } else {
+        this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
+      }
     } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
       this.iterator = new SkipMergeIterator(
           getRequiredSchemaReader(split.getBasePath().get()),
@@ -190,6 +195,9 @@ public class MergeOnReadInputFormat
           new Schema.Parser().parse(this.tableState.getAvroSchema()),
           new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
           this.requiredPos,
+          this.emitDelete,
+          this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED),
+          this.tableState.getOperationPos(),
           getFullSchemaReader(split.getBasePath().get()));
     } else {
       throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
@@ -298,7 +306,7 @@ public class MergeOnReadInputFormat
     final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
     final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
         AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
-    final HoodieMergedLogRecordScanner scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
     final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
     final int[] pkOffset = tableState.getPkOffsetsInRequired();
     // flag saying whether the pk semantics has been dropped by user specified
@@ -335,18 +343,20 @@ public class MergeOnReadInputFormat
               }
               delete.setRowKind(RowKind.DELETE);
 
-              this.currentRecord =  delete;
+              this.currentRecord = delete;
               return true;
             }
             // skipping if the condition is unsatisfied
             // continue;
           } else {
+            final IndexedRecord avroRecord = curAvroRecord.get();
             GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
-                curAvroRecord.get(),
+                avroRecord,
                 requiredSchema,
                 requiredPos,
                 recordBuilder);
             currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+            FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
             return true;
           }
         }
@@ -365,6 +375,55 @@ public class MergeOnReadInputFormat
     };
   }
 
+  private ClosableIterator<RowData> getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
+    final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
+    final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
+    final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
+    final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
+        AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+    final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf);
+    final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator();
+
+    return new ClosableIterator<RowData>() {
+      private RowData currentRecord;
+
+      @Override
+      public boolean hasNext() {
+        while (recordsIterator.hasNext()) {
+          Option<IndexedRecord> curAvroRecord = null;
+          final HoodieRecord<?> hoodieRecord = recordsIterator.next();
+          try {
+            curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
+          } catch (IOException e) {
+            throw new HoodieException("Get avro insert value error for key: " + hoodieRecord.getRecordKey(), e);
+          }
+          if (curAvroRecord.isPresent()) {
+            final IndexedRecord avroRecord = curAvroRecord.get();
+            GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+                avroRecord,
+                requiredSchema,
+                requiredPos,
+                recordBuilder);
+            currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+            FormatUtils.setRowKind(currentRecord, avroRecord, tableState.getOperationPos());
+            return true;
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public RowData next() {
+        return currentRecord;
+      }
+
+      @Override
+      public void close() {
+        records.close();
+      }
+    };
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
@@ -544,6 +603,8 @@ public class MergeOnReadInputFormat
     private final Schema tableSchema;
     private final Schema requiredSchema;
     private final int[] requiredPos;
+    private final boolean emitDelete;
+    private final int operationPos;
     private final RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter;
     private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
     private final GenericRecordBuilder recordBuilder;
@@ -557,7 +618,7 @@ public class MergeOnReadInputFormat
     // refactor it out once FLINK-22370 is resolved.
     private boolean readLogs = false;
 
-    private Set<String> keyToSkip = new HashSet<>();
+    private final Set<String> keyToSkip = new HashSet<>();
 
     private RowData currentRecord;
 
@@ -569,13 +630,18 @@ public class MergeOnReadInputFormat
         Schema tableSchema,
         Schema requiredSchema,
         int[] requiredPos,
+        boolean emitDelete,
+        boolean withOperationField,
+        int operationPos,
         ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
       this.tableSchema = tableSchema;
       this.reader = reader;
-      this.scanner = FormatUtils.scanLog(split, tableSchema, hadoopConf);
+      this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField);
       this.logKeysIterator = scanner.getRecords().keySet().iterator();
       this.requiredSchema = requiredSchema;
       this.requiredPos = requiredPos;
+      this.emitDelete = emitDelete;
+      this.operationPos = operationPos;
       this.recordBuilder = new GenericRecordBuilder(requiredSchema);
       this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
       this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
@@ -602,12 +668,13 @@ public class MergeOnReadInputFormat
             // deleted
             continue;
           } else {
-            GenericRecord record = buildAvroRecordBySchema(
+            GenericRecord avroRecord = buildAvroRecordBySchema(
                 mergedAvroRecord.get(),
                 requiredSchema,
                 requiredPos,
                 recordBuilder);
-            this.currentRecord = (RowData) avroToRowDataConverter.convert(record);
+            this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
+            FormatUtils.setRowKind(this.currentRecord, mergedAvroRecord.get(), this.operationPos);
             return false;
           }
         }
@@ -620,16 +687,16 @@ public class MergeOnReadInputFormat
       while (logKeysIterator.hasNext()) {
         final String curKey = logKeysIterator.next();
         if (!keyToSkip.contains(curKey)) {
-          Option<IndexedRecord> insertAvroRecord =
-              scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
+          Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey);
           if (insertAvroRecord.isPresent()) {
             // the record is a DELETE if insertAvroRecord not present, skipping
-            GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+            GenericRecord avroRecord = buildAvroRecordBySchema(
                 insertAvroRecord.get(),
                 requiredSchema,
                 requiredPos,
                 recordBuilder);
-            this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+            this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
+            FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos);
             return false;
           }
         }
@@ -637,6 +704,14 @@ public class MergeOnReadInputFormat
       return true;
     }
 
+    private Option<IndexedRecord> getInsertValue(String curKey) throws IOException {
+      final HoodieRecord<?> record = scanner.getRecords().get(curKey);
+      if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
+        return Option.empty();
+      }
+      return record.getData().getInsertValue(tableSchema);
+    }
+
     @Override
     public RowData nextRecord() {
       return currentRecord;
@@ -655,8 +730,12 @@ public class MergeOnReadInputFormat
     private Option<IndexedRecord> mergeRowWithLog(
         RowData curRow,
         String curKey) throws IOException {
+      final HoodieRecord<?> record = scanner.getRecords().get(curKey);
+      if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
+        return Option.empty();
+      }
       GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
-      return scanner.getRecords().get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
+      return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema);
     }
   }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 9a32af6..0a63c91 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.table.format.mor;
 
+import org.apache.hudi.common.model.HoodieRecord;
+
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -38,6 +40,7 @@ public class MergeOnReadTableState implements Serializable {
   private final String requiredAvroSchema;
   private final List<MergeOnReadInputSplit> inputSplits;
   private final String[] pkFields;
+  private final int operationPos;
 
   public MergeOnReadTableState(
       RowType rowType,
@@ -52,6 +55,7 @@ public class MergeOnReadTableState implements Serializable {
     this.requiredAvroSchema = requiredAvroSchema;
     this.inputSplits = inputSplits;
     this.pkFields = pkFields;
+    this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
   }
 
   public RowType getRowType() {
@@ -74,6 +78,10 @@ public class MergeOnReadTableState implements Serializable {
     return inputSplits;
   }
 
+  public int getOperationPos() {
+    return operationPos;
+  }
+
   public int[] getRequiredPositions() {
     final List<String> fieldNames = rowType.getFieldNames();
     return requiredRowType.getFieldNames().stream()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java b/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java
new file mode 100644
index 0000000..164815b
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/ChangelogModes.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Utilities for all kinds of common {@link org.apache.flink.table.connector.ChangelogMode}s.
+ */
+public class ChangelogModes {
+  public static final ChangelogMode FULL = ChangelogMode.newBuilder()
+      .addContainedKind(RowKind.INSERT)
+      .addContainedKind(RowKind.UPDATE_BEFORE)
+      .addContainedKind(RowKind.UPDATE_AFTER)
+      .addContainedKind(RowKind.DELETE)
+      .build();
+
+  /**
+   * Change log mode that ignores UPDATE_BEFORE, e.g UPSERT.
+   */
+  public static final ChangelogMode UPSERT = ChangelogMode.newBuilder()
+      .addContainedKind(RowKind.INSERT)
+      .addContainedKind(RowKind.UPDATE_AFTER)
+      .addContainedKind(RowKind.DELETE)
+      .build();
+
+  private ChangelogModes() {
+  }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 86de90e..06305e2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -186,6 +186,7 @@ public class StreamerUtil {
                 .build())
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
             .withAutoCommit(false)
+            .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
             .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
 
     builder = builder.withSchema(getSourceSchema(conf).toString());
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 6d802fe..ade43c5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -38,9 +38,9 @@ import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
 import org.apache.hudi.sink.compact.FlinkCompactionConfig;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignOperator;
+import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
 import org.apache.hudi.sink.transform.Transformer;
-import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.CompactionUtil;
@@ -85,6 +85,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 /**
  * Integration test for Flink Hoodie stream sink.
  */
@@ -200,7 +202,9 @@ public class StreamWriteITCase extends TestLogger {
     // To compute the compaction instant time and do compaction.
     String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
     HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
-    writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+    boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
+
+    assertTrue(scheduled, "The compaction plan should be scheduled");
 
     HoodieFlinkTable<?> table = writeClient.getHoodieTable();
     // generate compaction plan
@@ -209,8 +213,10 @@ public class StreamWriteITCase extends TestLogger {
         table.getMetaClient(), compactionInstantTime);
 
     HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+    // Mark instant as compaction inflight
+    table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
 
-    env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
+    env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
         .name("compaction_source")
         .uid("uid_compaction_source")
         .rebalance()
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index d5639c5..8e7f809 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -399,33 +399,29 @@ public class TestWriteCopyOnWrite {
     // the coordinator checkpoint commits the inflight instant.
     checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
 
-    Map<String, String> expected = new HashMap<>();
-    // id3, id5 were deleted and id9 is ignored
-    expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
-    expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
-    expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
-    expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
+    Map<String, String> expected = getUpsertWithDeleteExpected();
     checkWrittenData(tempFile, expected);
   }
 
   @Test
   public void testInsertWithMiniBatches() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
     funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
 
     // open the function and ingest data
     funcWrapper.openFunction();
-    // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+    // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+    // so 3 records expect to trigger a mini-batch write
     for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
     }
 
     Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
     assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
-    assertThat("2 records expect to flush out as a mini-batch",
+    assertThat("3 records expect to flush out as a mini-batch",
         dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
-        is(2));
+        is(3));
 
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(1);
@@ -472,22 +468,23 @@ public class TestWriteCopyOnWrite {
   @Test
   public void testInsertWithDeduplication() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
     conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
     funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
 
     // open the function and ingest data
     funcWrapper.openFunction();
-    // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write
+    // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+    // so 3 records expect to trigger a mini-batch write
     for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
       funcWrapper.invoke(rowData);
     }
 
     Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
     assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
-    assertThat("2 records expect to flush out as a mini-batch",
+    assertThat("3 records expect to flush out as a mini-batch",
         dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
-        is(2));
+        is(3));
 
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(1);
@@ -612,12 +609,13 @@ public class TestWriteCopyOnWrite {
   @Test
   public void testInsertWithSmallBufferSize() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size
+    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes buffer size
     funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
 
     // open the function and ingest data
     funcWrapper.openFunction();
-    // each record is 208 bytes. so 4 records expect to trigger buffer flush:
+    // record (operation: 'I') is 304 bytes and record (operation: 'U') is 352 bytes.
+    // so 3 records expect to trigger a mini-batch write
     // flush the max size bucket once at a time.
     for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
       funcWrapper.invoke(rowData);
@@ -625,9 +623,9 @@ public class TestWriteCopyOnWrite {
 
     Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
     assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
-    assertThat("2 records expect to flush out as a mini-batch",
+    assertThat("3 records expect to flush out as a mini-batch",
         dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
-        is(2));
+        is(3));
 
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(1);
@@ -676,8 +674,17 @@ public class TestWriteCopyOnWrite {
     // the last 2 lines are merged
     expected.put("par1", "["
         + "id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,1,par1, "
-        + "id1,par1,id1,Danny,23,1,par1]");
+        + "id1,par1,id1,Danny,23,1,par1" + "]");
+    return expected;
+  }
+
+  protected Map<String, String> getUpsertWithDeleteExpected() {
+    Map<String, String> expected = new HashMap<>();
+    // id3, id5 were deleted and id9 is ignored
+    expected.put("par1", "[id1,par1,id1,Danny,24,1,par1, id2,par1,id2,Stephen,34,2,par1]");
+    expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
+    expected.put("par3", "[id6,par3,id6,Emma,20,6,par3]");
+    expected.put("par4", "[id7,par4,id7,Bob,44,7,par4, id8,par4,id8,Han,56,8,par4]");
     return expected;
   }
 
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index 0d445d6..32ee725 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -47,7 +47,7 @@ public class TestRowDataKeyGen {
     assertThat(keyGen1.getPartitionPath(rowData1), is("par1"));
 
     // null record key and partition path
-    final RowData rowData2 = insertRow(null, StringData.fromString("Danny"), 23,
+    final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(1), null);
     assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
     assertThat(keyGen1.getPartitionPath(rowData2), is("default"));
@@ -77,7 +77,7 @@ public class TestRowDataKeyGen {
     assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001"));
 
     // null record key and partition path
-    final RowData rowData2 = insertRow(null, null, 23, null, null);
+    final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE,null, null, 23, null, null);
     assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
     assertThat(keyGen1.getPartitionPath(rowData2), is("default/default"));
     // empty record key and partition path
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 957b9b8..f8dc018 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -261,6 +261,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setString(FlinkOptions.TABLE_NAME, "t1");
     conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);
 
     // write one commit
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -276,17 +277,20 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
     options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2");
     options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit);
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
     streamTableEnv.executeSql(hoodieTableDDL);
 
-    List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
-    final String expected = "["
-        + "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-        + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-        + "id3,null,null,null,null, "
-        + "id5,null,null,null,null, "
-        + "id9,null,null,null,null]";
-    assertRowsEquals(result, expected);
+    final String sinkDDL = "create table sink(\n"
+        + "  name varchar(20),\n"
+        + "  age_sum int\n"
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + ")";
+    List<Row> result = execSelectSql(streamTableEnv,
+        "select name, sum(age) from t1 group by name", sinkDDL, 10);
+    final String expected = "[+I(Danny,24), +I(Stephen,34)]";
+    assertRowsEquals(result, expected, true);
   }
 
   @ParameterizedTest
@@ -724,6 +728,11 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     } else {
       sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
     }
+    return execSelectSql(tEnv, select, sinkDDL, timeout);
+  }
+
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout)
+          throws InterruptedException {
     tEnv.executeSql(sinkDDL);
     TableResult tableResult = tEnv.executeSql("insert into sink " + select);
     // wait for the timeout then cancels the job
@@ -731,7 +740,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     tableResult.getJobClient().ifPresent(JobClient::cancel);
     tEnv.executeSql("DROP TABLE IF EXISTS sink");
     return CollectSinkTableFactory.RESULT.values().stream()
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
   }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index c8885b0..5535945 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.format;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.HoodieTableSource;
+import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -60,9 +61,14 @@ public class TestInputFormat {
   File tempFile;
 
   void beforeEach(HoodieTableType tableType) throws IOException {
+    beforeEach(tableType, Collections.emptyMap());
+  }
+
+  void beforeEach(HoodieTableType tableType, Map<String, String> options) throws IOException {
     conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
     conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
+    options.forEach((key, value) -> conf.setString(key, value));
 
     StreamerUtil.initTableIfNotExists(conf);
     this.tableSource = new HoodieTableSource(
@@ -163,8 +169,62 @@ public class TestInputFormat {
   }
 
   @Test
-  void testReadWithDeletes() throws Exception {
-    beforeEach(HoodieTableType.MERGE_ON_READ);
+  void testReadBaseAndLogFilesWithDeletes() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write base first with compaction.
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    // write another commit using logs and read again.
+    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    // when isEmitDelete is false.
+    List<RowData> result1 = readData(inputFormat);
+
+    final String actual1 = TestData.rowDataToString(result1, true);
+    final String expected1 = "["
+        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+        + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
+        + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
+        + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
+        + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4)]";
+    assertThat(actual1, is(expected1));
+
+    // refresh the input format and set isEmitDelete to true.
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat();
+    ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+    List<RowData> result2 = readData(inputFormat);
+
+    final String actual2 = TestData.rowDataToString(result2, true);
+    final String expected2 = "["
+        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+        + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
+        + "+I(id4,Fabian,31,1970-01-01T00:00:00.004,par2), "
+        + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
+        + "+I(id6,Emma,20,1970-01-01T00:00:00.006,par3), "
+        + "+I(id7,Bob,44,1970-01-01T00:00:00.007,par4), "
+        + "+I(id8,Han,56,1970-01-01T00:00:00.008,par4), "
+        + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+    assertThat(actual2, is(expected2));
+  }
+
+  @Test
+  void testReadWithDeletesMOR() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
 
     // write another commit to read again
     TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
@@ -175,13 +235,32 @@ public class TestInputFormat {
 
     List<RowData> result = readData(inputFormat);
 
-    final String actual = TestData.rowDataToString(result);
+    final String actual = TestData.rowDataToString(result, true);
     final String expected = "["
-        + "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-        + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-        + "id3,null,null,null,null, "
-        + "id5,null,null,null,null, "
-        + "id9,null,null,null,null]";
+        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
+        + "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
+        + "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
+        + "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+    assertThat(actual, is(expected));
+  }
+
+  @Test
+  void testReadWithDeletesCOW() throws Exception {
+    beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));
+
+    List<RowData> result = readData(inputFormat);
+
+    final String actual = TestData.rowDataToString(result, true);
+    final String expected = "["
+        + "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
+        + "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1)]";
     assertThat(actual, is(expected));
   }
 
@@ -205,6 +284,33 @@ public class TestInputFormat {
     assertThat(actual, is(expected));
   }
 
+  @Test
+  void testReadChangesUnMergedMOR() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");
+    beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    List<RowData> result = readData(inputFormat);
+
+    final String actual = TestData.rowDataToString(result, true);
+    final String expected = "["
+        + "+I(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
+        + "-U(id1,Danny,19,1970-01-01T00:00:00.001,par1), "
+        + "+U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
+        + "-U(id1,Danny,20,1970-01-01T00:00:00.002,par1), "
+        + "+U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
+        + "-U(id1,Danny,21,1970-01-01T00:00:00.003,par1), "
+        + "+U(id1,Danny,22,1970-01-01T00:00:00.004,par1), "
+        + "-D(id1,Danny,22,1970-01-01T00:00:00.005,par1)]";
+    assertThat(actual, is(expected));
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index f5ac9c5..d3e32e6 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -234,15 +234,53 @@ public class TestData {
           TimestampData.fromEpochMillis(6), StringData.fromString("par3"))
   );
 
+  public static List<RowData> DATA_SET_INSERT_UPDATE_DELETE = Arrays.asList(
+      // INSERT
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+      // UPDATE
+      updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 19,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+      updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 20,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+      updateBeforeRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+      updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+          TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+      // DELETE
+      deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+          TimestampData.fromEpochMillis(5), StringData.fromString("par1"))
+  );
+
   /**
    * Returns string format of a list of RowData.
    */
   public static String rowDataToString(List<RowData> rows) {
+    return rowDataToString(rows, false);
+  }
+
+  /**
+   * Returns string format of a list of RowData.
+   *
+   * @param withChangeFlag whether to print the change flag
+   */
+  public static String rowDataToString(List<RowData> rows, boolean withChangeFlag) {
     DataStructureConverter<Object, Object> converter =
         DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE);
     return rows.stream()
-        .map(row -> converter.toExternal(row).toString())
-        .sorted(Comparator.naturalOrder())
+        .sorted(Comparator.comparing(o -> toStringSafely(o.getString(0))))
+        .map(row -> {
+          final String rowStr = converter.toExternal(row).toString();
+          if (withChangeFlag) {
+            return row.getRowKind().shortString() + "(" + rowStr + ")";
+          } else {
+            return rowStr;
+          }
+        })
         .collect(Collectors.toList()).toString();
   }
 
@@ -287,7 +325,30 @@ public class TestData {
    * @param expected Expected string of the sorted rows
    */
   public static void assertRowsEquals(List<Row> rows, String expected) {
-    assertRowsEquals(rows, expected, 0);
+    assertRowsEquals(rows, expected, false);
+  }
+
+  /**
+   * Sort the {@code rows} using field at index 0 and asserts
+   * it equals with the expected string {@code expected}.
+   *
+   * @param rows           Actual result rows
+   * @param expected       Expected string of the sorted rows
+   * @param withChangeFlag Whether compares with change flags
+   */
+  public static void assertRowsEquals(List<Row> rows, String expected, boolean withChangeFlag) {
+    String rowsString = rows.stream()
+        .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
+        .map(row -> {
+          final String rowStr = row.toString();
+          if (withChangeFlag) {
+            return row.getKind().shortString() + "(" + rowStr + ")";
+          } else {
+            return rowStr;
+          }
+        })
+        .collect(Collectors.toList()).toString();
+    assertThat(rowsString, is(expected));
   }
 
   /**
@@ -573,7 +634,11 @@ public class TestData {
   }
 
   public static BinaryRowData insertRow(Object... fields) {
-    LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
+    return insertRow(TestConfigurations.ROW_TYPE, fields);
+  }
+
+  public static BinaryRowData insertRow(RowType rowType, Object... fields) {
+    LogicalType[] types = rowType.getFields().stream().map(RowType.RowField::getType)
         .toArray(LogicalType[]::new);
     assertEquals(
         "Filed count inconsistent with type information",
@@ -599,4 +664,16 @@ public class TestData {
     rowData.setRowKind(RowKind.DELETE);
     return rowData;
   }
+
+  private static BinaryRowData updateBeforeRow(Object... fields) {
+    BinaryRowData rowData = insertRow(fields);
+    rowData.setRowKind(RowKind.UPDATE_BEFORE);
+    return rowData;
+  }
+
+  private static BinaryRowData updateAfterRow(Object... fields) {
+    BinaryRowData rowData = insertRow(fields);
+    rowData.setRowKind(RowKind.UPDATE_AFTER);
+    return rowData;
+  }
 }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java
new file mode 100644
index 0000000..7729042
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestHoodieRowData.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.client.model.HoodieRowData;
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit tests {@link HoodieRowData}.
+ */
+public class TestHoodieRowData {
+  private final int metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.size();
+  private static final Random RANDOM = new Random();
+  private static final int INTEGER_INDEX = 0;
+  private static final int STRING_INDEX = 1;
+  private static final int BOOLEAN_INDEX = 2;
+  private static final int SHORT_INDEX = 3;
+  private static final int BYTE_INDEX = 4;
+  private static final int LONG_INDEX = 5;
+  private static final int FLOAT_INDEX = 6;
+  private static final int DOUBLE_INDEX = 7;
+  private static final int DECIMAL_INDEX = 8;
+  private static final int BINARY_INDEX = 9;
+  private static final int ROW_INDEX = 10;
+
+  private static final DataType BASIC_DATA_TYPE = DataTypes.ROW(
+      DataTypes.FIELD("integer", DataTypes.INT()),
+      DataTypes.FIELD("string", DataTypes.STRING()),
+      DataTypes.FIELD("boolean", DataTypes.BOOLEAN()),
+      DataTypes.FIELD("short", DataTypes.SMALLINT()),
+      DataTypes.FIELD("byte", DataTypes.TINYINT()),
+      DataTypes.FIELD("long", DataTypes.BIGINT()),
+      DataTypes.FIELD("float", DataTypes.FLOAT()),
+      DataTypes.FIELD("double", DataTypes.DOUBLE()),
+      DataTypes.FIELD("decimal", DataTypes.DECIMAL(10, 4)),
+      DataTypes.FIELD("binary", DataTypes.BYTES()),
+      DataTypes.FIELD("row", DataTypes.ROW()))
+      .notNull();
+  private static final RowType ROW_TYPE = (RowType) BASIC_DATA_TYPE.getLogicalType();
+
+  @Test
+  public void testGet() {
+    Object[] values = getRandomValue(true);
+    RowData rowData = TestData.insertRow(ROW_TYPE, values);
+
+    HoodieRowData hoodieRowData = new HoodieRowData("commitTime", "commitSeqNo", "recordKey", "partitionPath", "fileName",
+        rowData, true);
+    assertValues(hoodieRowData, "commitTime", "commitSeqNo", "recordKey", "partitionPath",
+        "fileName", values);
+  }
+
+  /**
+   * Fetches a random Object[] of values for testing.
+   *
+   * @param haveRowType true if rowType need to be added as one of the elements in the Object[]
+   * @return the random Object[] thus generated
+   */
+  private Object[] getRandomValue(boolean haveRowType) {
+    Object[] values = new Object[11];
+    values[INTEGER_INDEX] = RANDOM.nextInt();
+    values[STRING_INDEX] = StringData.fromString(UUID.randomUUID().toString());
+    values[BOOLEAN_INDEX] = RANDOM.nextBoolean();
+    values[SHORT_INDEX] = (short) RANDOM.nextInt(2);
+    byte[] bytes = new byte[1];
+    RANDOM.nextBytes(bytes);
+    values[BYTE_INDEX] = bytes[0];
+    values[LONG_INDEX] = RANDOM.nextLong();
+    values[FLOAT_INDEX] = RANDOM.nextFloat();
+    values[DOUBLE_INDEX] = RANDOM.nextDouble();
+    values[DECIMAL_INDEX] = DecimalData.fromBigDecimal(new BigDecimal("1005.12313"), 10, 4);
+    bytes = new byte[20];
+    RANDOM.nextBytes(bytes);
+    values[BINARY_INDEX] = bytes;
+    if (haveRowType) {
+      Object[] rowField = getRandomValue(false);
+      values[ROW_INDEX] = TestData.insertRow(ROW_TYPE, rowField);
+    }
+    return values;
+  }
+
+  private void assertValues(HoodieRowData hoodieRowData, String commitTime, String commitSeqNo, String recordKey, String partitionPath,
+                            String filename, Object[] values) {
+    assertEquals(commitTime, hoodieRowData.getString(0).toString());
+    assertEquals(commitSeqNo, hoodieRowData.getString(1).toString());
+    assertEquals(recordKey, hoodieRowData.getString(2).toString());
+    assertEquals(partitionPath, hoodieRowData.getString(3).toString());
+    assertEquals(filename, hoodieRowData.getString(4).toString());
+    assertEquals("I", hoodieRowData.getString(5).toString());
+    // row data.
+    assertEquals(values[INTEGER_INDEX], hoodieRowData.getInt(INTEGER_INDEX + metaColumnsNum));
+    assertEquals(values[STRING_INDEX], hoodieRowData.getString(STRING_INDEX + metaColumnsNum));
+    assertEquals(values[BOOLEAN_INDEX], hoodieRowData.getBoolean(BOOLEAN_INDEX + metaColumnsNum));
+    assertEquals(values[SHORT_INDEX], hoodieRowData.getShort(SHORT_INDEX + metaColumnsNum));
+    assertEquals(values[BYTE_INDEX], hoodieRowData.getByte(BYTE_INDEX + metaColumnsNum));
+    assertEquals(values[LONG_INDEX], hoodieRowData.getLong(LONG_INDEX + metaColumnsNum));
+    assertEquals(values[FLOAT_INDEX], hoodieRowData.getFloat(FLOAT_INDEX + metaColumnsNum));
+    assertEquals(values[DOUBLE_INDEX], hoodieRowData.getDouble(DOUBLE_INDEX + metaColumnsNum));
+    assertEquals(values[DECIMAL_INDEX], hoodieRowData.getDecimal(DECIMAL_INDEX + metaColumnsNum, 10, 4));
+    byte[] exceptBinary = (byte[]) values[BINARY_INDEX];
+    byte[] binary = hoodieRowData.getBinary(BINARY_INDEX + metaColumnsNum);
+    for (int i = 0; i < exceptBinary.length; i++) {
+      assertEquals(exceptBinary[i], binary[i]);
+    }
+    assertEquals(values[ROW_INDEX], hoodieRowData.getRow(ROW_INDEX + metaColumnsNum, values.length));
+  }
+}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
index 6dc9325..33e9d37 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -150,6 +150,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
     public void invoke(RowData value, SinkFunction.Context context) {
       Row row = (Row) converter.toExternal(value);
       assert row != null;
+      row.setKind(value.getRowKind());
       RESULT.get(taskID).add(row);
     }
 
diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
index cd84c3b..6af0119 100644
--- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
+++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
@@ -71,7 +71,7 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
 
   public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
     super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
-        syncConfig.verifyMetadataFileListing, fs);
+        syncConfig.verifyMetadataFileListing, false, fs);
     this.dlaConfig = syncConfig;
     try {
       this.partitionValueExtractor =
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 560563c..d9d833d 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -120,6 +120,9 @@ public class HiveSyncConfig implements Serializable {
   @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
   public int sparkSchemaLengthThreshold = 4000;
 
+  @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
+  public Boolean withOperationField = false;
+
   // enhance the similar function in child class
   public static HiveSyncConfig copy(HiveSyncConfig cfg) {
     HiveSyncConfig newConfig = new HiveSyncConfig();
@@ -143,6 +146,7 @@ public class HiveSyncConfig implements Serializable {
     newConfig.batchSyncNum = cfg.batchSyncNum;
     newConfig.syncAsSparkDataSourceTable = cfg.syncAsSparkDataSourceTable;
     newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
+    newConfig.withOperationField = cfg.withOperationField;
     return newConfig;
   }
 
@@ -174,6 +178,7 @@ public class HiveSyncConfig implements Serializable {
       + ", createManagedTable=" + createManagedTable
       + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
       + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
+      + ", withOperationField=" + withOperationField
       + '}';
   }
 }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 4a22bb8..13e48f5 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -62,7 +62,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
   private final HiveSyncConfig syncConfig;
 
   public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
-    super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, fs);
+    super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.verifyMetadataFileListing, cfg.withOperationField, fs);
     this.syncConfig = cfg;
 
     // Support JDBC, HiveQL and metastore based implementations for backwards compatiblity. Future users should
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 1107d74..11ff745 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -51,19 +51,21 @@ public abstract class AbstractSyncHoodieClient {
   protected final HoodieTableMetaClient metaClient;
   protected final HoodieTableType tableType;
   protected final FileSystem fs;
-  private String basePath;
-  private boolean assumeDatePartitioning;
-  private boolean useFileListingFromMetadata;
-  private boolean verifyMetadataFileListing;
+  private final String basePath;
+  private final boolean assumeDatePartitioning;
+  private final boolean useFileListingFromMetadata;
+  private final boolean verifyMetadataFileListing;
+  private final boolean withOperationField;
 
   public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
-                                  boolean verifyMetadataFileListing, FileSystem fs) {
+                                  boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
     this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     this.tableType = metaClient.getTableType();
     this.basePath = basePath;
     this.assumeDatePartitioning = assumeDatePartitioning;
     this.useFileListingFromMetadata = useFileListingFromMetadata;
     this.verifyMetadataFileListing = verifyMetadataFileListing;
+    this.withOperationField = withOperationField;
     this.fs = fs;
   }
 
@@ -139,7 +141,11 @@ public abstract class AbstractSyncHoodieClient {
    */
   public MessageType getDataSchema() {
     try {
-      return new TableSchemaResolver(metaClient).getTableParquetSchema();
+      if (withOperationField) {
+        return new TableSchemaResolver(metaClient, true).getTableParquetSchema();
+      } else {
+        return new TableSchemaResolver(metaClient).getTableParquetSchema();
+      }
     } catch (Exception e) {
       throw new HoodieSyncException("Failed to read data schema", e);
     }