You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/01/30 12:33:43 UTC

[incubator-doris] branch master updated: [Delete] Support delete with multi partitions (#5252)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de57667  [Delete] Support delete with multi partitions (#5252)
de57667 is described below

commit de57667d6dc7ddc32fabd0f5c3299623ab9895d1
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sat Jan 30 20:33:34 2021 +0800

    [Delete] Support delete with multi partitions (#5252)
    
    Support delete statement like:
    1. delete from table partitions(p1, p2) where xxx;  // apply to p1, p2
    2. delete from table where xxx;     // apply to all partitions
    
    Also remove code about the deprecated sync/async delete job.
    
    This CL changes FE meta version to 94
---
 docs/en/administrator-guide/variables.md           |   8 +-
 .../sql-statements/Data Manipulation/DELETE.md     |   6 +-
 docs/zh-CN/administrator-guide/variables.md        |   8 +-
 .../sql-statements/Data Manipulation/DELETE.md     |   8 +-
 .../java/org/apache/doris/analysis/DeleteStmt.java |  20 +-
 .../org/apache/doris/analysis/ShowDeleteStmt.java  |   2 +-
 .../java/org/apache/doris/catalog/Catalog.java     |  78 +-
 .../apache/doris/catalog/CatalogRecycleBin.java    |   8 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../doris/common/proc/DeleteInfoProcDir.java       |  33 +-
 .../doris/common/proc/DeleteJobProcNode.java       |  56 --
 .../org/apache/doris/common/proc/JobsProcDir.java  |   7 +-
 .../org/apache/doris/journal/JournalEntity.java    |  15 +-
 .../doris/journal/local/LocalJournalCursor.java    |  15 +-
 .../java/org/apache/doris/load/AsyncDeleteJob.java | 276 ------
 .../java/org/apache/doris/load/DeleteHandler.java  | 173 ++--
 .../java/org/apache/doris/load/DeleteInfo.java     | 182 ++--
 .../main/java/org/apache/doris/load/DeleteJob.java |  38 +-
 .../src/main/java/org/apache/doris/load/Load.java  | 981 ++-------------------
 .../java/org/apache/doris/load/LoadChecker.java    |  31 -
 .../main/java/org/apache/doris/load/LoadJob.java   |   3 +-
 .../org/apache/doris/load/TabletDeleteInfo.java    |  11 +-
 .../java/org/apache/doris/master/MasterImpl.java   |  19 +-
 .../java/org/apache/doris/persist/EditLog.java     |  21 -
 .../org/apache/doris/persist/OperationType.java    |   2 +
 .../java/org/apache/doris/qe/ConnectProcessor.java |  54 +-
 .../java/org/apache/doris/qe/MasterOpExecutor.java |  15 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 232 +++--
 .../java/org/apache/doris/qe/ShowExecutor.java     |   9 +-
 .../main/java/org/apache/doris/qe/VariableMgr.java |  14 +-
 .../org/apache/doris/analysis/DeleteStmtTest.java  |  33 +-
 .../org/apache/doris/load/DeleteHandlerTest.java   |  27 +-
 .../org/apache/doris/load/LoadCheckerTest.java     |  14 +-
 .../org/apache/doris/qe/SessionVariablesTest.java  |  70 ++
 gensrc/thrift/FrontendService.thrift               |  11 +-
 35 files changed, 663 insertions(+), 1821 deletions(-)

diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md
index 468df82..df14308 100644
--- a/docs/en/administrator-guide/variables.md
+++ b/docs/en/administrator-guide/variables.md
@@ -151,6 +151,12 @@ Note that the comment must start with /*+ and can only follow the SELECT.
 
     Used for compatibility with MySQL clients. No practical effect.
 
+* `delete_without_partition`
+
+    When set to true. When using the delete command to delete partition table data, no partition is required. The delete operation will be automatically applied to all partitions.
+
+     Note, however, that the automatic application to all partitions may cause the delete command to take a long time to trigger a large number of subtasks and cause a long time. If it is not necessary, it is not recommended to turn it on.
+
 * `disable_colocate_join`
 
     Controls whether the [Colocation Join] (./colocation-join.md) function is enabled. The default is false, which means that the feature is enabled. True means that the feature is disabled. When this feature is disabled, the query plan will not attempt to perform a Colocation Join.
@@ -370,4 +376,4 @@ Note that the comment must start with /*+ and can only follow the SELECT.
 * `insert_visible_timeout_ms`
 
     When execute insert statement, doris will wait for the transaction to commit and visible after the import is completed.
-    This parameter controls the timeout of waiting for transaction to be visible. The default value is 10000, and the minimum value is 1000.
\ No newline at end of file
+    This parameter controls the timeout of waiting for transaction to be visible. The default value is 10000, and the minimum value is 1000.
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/DELETE.md b/docs/en/sql-reference/sql-statements/Data Manipulation/DELETE.md
index bc51c2d..a0ef240 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/DELETE.md	
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/DELETE.md	
@@ -40,7 +40,7 @@ under the License.
         2) When the selected key column does not exist in a rollup, delete cannot be performed.
         3) The relationship between conditions can only be "and".
         If you want to achieve the "or" relationship, you need to divide the conditions into two DELETE statements.
-        4) If you partition a table for RANGE, you must specify PARTITION. If it is a single partition table, you can not specify it.
+        4) If it is a RANGE partitioned table, you can specify the partition. If not specified, and the session variable delete_without_partition is true, it will be applied to all partitions. If it is a single partition table, you do not need to specify it.
     
     Notice:
         This statement may reduce query efficiency for a period of time after execution.
@@ -57,6 +57,10 @@ under the License.
     DELETE FROM my_table PARTITION p1
     WHERE k1 >= 3 AND k2 = "abc";
 
+    2. Delete rows whose K1 column value is greater than or equal to 3 and whose K2 column value is "abc" in my_table partition P1,P2
+    DELETE FROM my_table PARTITIONS (p1, p2)
+    WHERE k1 >= 3 AND k2 = "abc";
+
 ## keyword
     DELETE
 
diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md
index 00df741..f6b8fd6 100644
--- a/docs/zh-CN/administrator-guide/variables.md
+++ b/docs/zh-CN/administrator-guide/variables.md
@@ -150,6 +150,12 @@ SELECT /*+ SET_VAR(query_timeout = 1) */ sleep(3);
 * `collation_server`
 
     用于兼容 MySQL 客户端。无实际作用。
+
+* `delete_without_partition`
+
+    设置为 true 时。当使用 delete 命令删除分区表数据时,可以不指定分区。delete 操作将会自动应用到所有分区。
+
+    但注意,自动应用到所有分区可能到导致 delete 命令耗时触发大量子任务导致耗时较长。如无必要,不建议开启。
     
 * `disable_colocate_join`
 
@@ -368,4 +374,4 @@ SELECT /*+ SET_VAR(query_timeout = 1) */ sleep(3);
 
 * `insert_visible_timeout_ms`
 
-    在执行insert语句时,导入动作(查询和插入)完成后,还需要等待事务提交,使数据可见。此参数控制等待数据可见的超时时间,默认为10000,最小为1000。
\ No newline at end of file
+    在执行insert语句时,导入动作(查询和插入)完成后,还需要等待事务提交,使数据可见。此参数控制等待数据可见的超时时间,默认为10000,最小为1000。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/DELETE.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/DELETE.md
index 7078134..c1dcbeb 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/DELETE.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/DELETE.md	
@@ -30,7 +30,7 @@ under the License.
     该语句用于按条件删除指定 table(base index) partition 中的数据。
     该操作会同时删除和此 base index 相关的 rollup index 的数据。
     语法:
-        DELETE FROM table_name [PARTITION partition_name]
+        DELETE FROM table_name [PARTITION partition_name | PARTITIONS (p1, p2)]
         WHERE
         column_name1 op { value | value_list } [ AND column_name2 op { value | value_list } ...];
         
@@ -40,7 +40,7 @@ under the License.
         2) 当选定的 key 列不存在于某个 rollup 中时,无法进行 delete。
         3) 条件之间只能是“与”的关系。
            若希望达成“或”的关系,需要将条件分写在两个 DELETE 语句中。
-        4) 如果为RANGE分区表,则必须指定 PARTITION。如果是单分区表,可以不指定。
+        4) 如果为RANGE分区表,可以指定分区,如不指定,且会话变量 delete_without_partition 为 true,则会应用到所有分区。如果是单分区表,可以不指定。
            
     注意:
         该语句可能会降低执行后一段时间内的查询效率。
@@ -56,6 +56,10 @@ under the License.
     2. 删除 my_table partition p1 中 k1 列值大于等于 3 且 k2 列值为 "abc" 的数据行
         DELETE FROM my_table PARTITION p1
         WHERE k1 >= 3 AND k2 = "abc";
+
+    3. 删除 my_table partition p1, p2 中 k1 列值大于等于 3 且 k2 列值为 "abc" 的数据行
+        DELETE FROM my_table PARTITIONS (p1, p2)
+        WHERE k1 >= 3 AND k2 = "abc";
         
 ## keyword
     DELETE
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java
index 6d80c07..8670a23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DeleteStmt.java
@@ -27,6 +27,9 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
 import java.util.LinkedList;
 import java.util.List;
 
@@ -51,17 +54,17 @@ public class DeleteStmt extends DdlStmt {
     public String getDbName() {
         return tbl.getDb();
     }
-    
-    public String getPartitionName() {
-        return partitionNames == null ? null : partitionNames.getPartitionNames().get(0);
+
+    public List<String> getPartitionNames() {
+        return partitionNames == null ? Lists.newArrayList() : partitionNames.getPartitionNames();
     }
-    
+
     public List<Predicate> getDeleteConditions() {
         return deleteConditions;
     }
 
     @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+    public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
         
         if (tbl == null) {
@@ -72,9 +75,6 @@ public class DeleteStmt extends DdlStmt {
 
         if (partitionNames != null) {
             partitionNames.analyze(analyzer);
-            if (partitionNames.getPartitionNames().size() != 1) {
-                throw new AnalysisException("Do not support deleting multi partitions");
-            }
             if (partitionNames.isTemp()) {
                 throw new AnalysisException("Do not support deleting temp partitions");
             }
@@ -151,7 +151,9 @@ public class DeleteStmt extends DdlStmt {
         StringBuilder sb = new StringBuilder();
         sb.append("DELETE FROM ").append(tbl.toSql());
         if (partitionNames != null) {
-            sb.append(" PARTITION ").append(partitionNames.getPartitionNames().get(0));
+            sb.append(" PARTITION (");
+            sb.append(Joiner.on(", ").join(partitionNames.getPartitionNames()));
+            sb.append(")");
         }
         sb.append(" WHERE ").append(wherePredicate.toSql());
         return sb.toString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDeleteStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDeleteStmt.java
index 679f376..672437e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDeleteStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDeleteStmt.java
@@ -58,7 +58,7 @@ public class ShowDeleteStmt extends ShowStmt {
     @Override
     public ShowResultSetMetaData getMetaData() {
         ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
-        for (String title : DeleteInfoProcDir.TITLE_NAMES_FOR_USER) {
+        for (String title : DeleteInfoProcDir.TITLE_NAMES) {
             builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
         }
         return builder.build();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index bf6d122..01809b4 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -141,7 +141,6 @@ import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.Timestamp;
 import org.apache.doris.load.DeleteHandler;
-import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportChecker;
 import org.apache.doris.load.ExportJob;
 import org.apache.doris.load.ExportMgr;
@@ -1652,27 +1651,11 @@ public class Catalog {
         }
 
         // delete jobs
+        // Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing.
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_11) {
             jobSize = dis.readInt();
+            Preconditions.checkState(jobSize == 0, jobSize);
             newChecksum ^= jobSize;
-            for (int i = 0; i < jobSize; i++) {
-                long dbId = dis.readLong();
-                newChecksum ^= dbId;
-
-                int deleteCount = dis.readInt();
-                newChecksum ^= deleteCount;
-                for (int j = 0; j < deleteCount; j++) {
-                    DeleteInfo deleteInfo = new DeleteInfo();
-                    deleteInfo.readFields(dis);
-                    long currentTimeMs = System.currentTimeMillis();
-
-                    // Delete the history delete jobs that are older than
-                    // LABEL_KEEP_MAX_MS
-                    if ((currentTimeMs - deleteInfo.getCreateTimeMs()) / 1000 <= Config.label_keep_max_second) {
-                        load.unprotectAddDeleteInfo(deleteInfo);
-                    }
-                }
-            }
         }
 
         // load error hub info
@@ -1684,28 +1667,10 @@ public class Catalog {
 
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) {
             // 4. load delete jobs
+            // Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing.
             int deleteJobSize = dis.readInt();
+            Preconditions.checkState(deleteJobSize == 0, deleteJobSize);
             newChecksum ^= deleteJobSize;
-            for (int i = 0; i < deleteJobSize; i++) {
-                long dbId = dis.readLong();
-                newChecksum ^= dbId;
-
-                int deleteJobCount = dis.readInt();
-                newChecksum ^= deleteJobCount;
-                for (int j = 0; j < deleteJobCount; j++) {
-                    LoadJob job = new LoadJob();
-                    job.readFields(dis);
-                    long currentTimeMs = System.currentTimeMillis();
-
-                    // Delete the history load jobs that are older than
-                    // LABEL_KEEP_MAX_MS
-                    // This job must be FINISHED or CANCELLED
-                    if ((currentTimeMs - job.getCreateTimeMs()) / 1000 <= Config.label_keep_max_second
-                            || (job.getState() != JobState.FINISHED && job.getState() != JobState.CANCELLED)) {
-                        load.unprotectAddLoadJob(job, true /* replay */);
-                    }
-                }
-            }
         }
 
         LOG.info("finished replay loadJob from image");
@@ -2071,46 +2036,20 @@ public class Catalog {
         }
 
         // 2. save delete jobs
-        Map<Long, List<DeleteInfo>> dbToDeleteInfos = load.getDbToDeleteInfos();
-        jobSize = dbToDeleteInfos.size();
+        // delete jobs are moved to DeleteHandler. So here we just set job size as 0.
+        jobSize = 0;
         checksum ^= jobSize;
         dos.writeInt(jobSize);
-        for (Entry<Long, List<DeleteInfo>> entry : dbToDeleteInfos.entrySet()) {
-            long dbId = entry.getKey();
-            checksum ^= dbId;
-            dos.writeLong(dbId);
-
-            List<DeleteInfo> deleteInfos = entry.getValue();
-            int deletInfoCount = deleteInfos.size();
-            checksum ^= deletInfoCount;
-            dos.writeInt(deletInfoCount);
-            for (DeleteInfo deleteInfo : deleteInfos) {
-                deleteInfo.write(dos);
-            }
-        }
 
         // 3. load error hub info
         LoadErrorHub.Param param = load.getLoadErrorHubInfo();
         param.write(dos);
 
         // 4. save delete load job info
-        Map<Long, List<LoadJob>> dbToDeleteJobs = load.getDbToDeleteJobs();
-        int deleteJobSize = dbToDeleteJobs.size();
+        // delete jobs are moved to DeleteHandler. So here we just set job size as 0.
+        int deleteJobSize = 0;
         checksum ^= deleteJobSize;
         dos.writeInt(deleteJobSize);
-        for (Entry<Long, List<LoadJob>> entry : dbToDeleteJobs.entrySet()) {
-            long dbId = entry.getKey();
-            checksum ^= dbId;
-            dos.writeLong(dbId);
-
-            List<LoadJob> deleteJobs = entry.getValue();
-            int deleteJobCount = deleteJobs.size();
-            checksum ^= deleteJobCount;
-            dos.writeInt(deleteJobCount);
-            for (LoadJob job : deleteJobs) {
-                job.write(dos);
-            }
-        }
 
         return checksum;
     }
@@ -2258,7 +2197,6 @@ public class Catalog {
             @Override
             protected void runAfterCatalogReady() {
                 load.removeOldLoadJobs();
-                load.removeOldDeleteJobs();
                 loadManager.removeOldLoadJob();
                 exportMgr.removeOldExportJobs();
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index 4f2788c..d6f5691 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -35,15 +35,15 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.thrift.TStorageMedium;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -150,7 +150,6 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
 
                 // remove jobs
                 Catalog.getCurrentCatalog().getLoadInstance().removeDbLoadJob(db.getId());
-                Catalog.getCurrentCatalog().getLoadInstance().removeDbDeleteJob(db.getId());
                 Catalog.getCurrentCatalog().getSchemaChangeHandler().removeDbAlterJob(db.getId());
                 Catalog.getCurrentCatalog().getRollupHandler().removeDbAlterJob(db.getId());
 
@@ -188,7 +187,6 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
 
         // remove jobs
         Catalog.getCurrentCatalog().getLoadInstance().removeDbLoadJob(dbId);
-        Catalog.getCurrentCatalog().getLoadInstance().removeDbDeleteJob(dbId);
         Catalog.getCurrentCatalog().getSchemaChangeHandler().removeDbAlterJob(dbId);
         Catalog.getCurrentCatalog().getRollupHandler().removeDbAlterJob(dbId);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 3fee46c..ba17874 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -202,6 +202,8 @@ public final class FeMetaVersion {
     public static final int VERSION_94 = 94;
     // serialize resources in restore job
     public static final int VERSION_95 = 95;
+    // support delete without partition
+    public static final int VERSION_96 = 96;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_95;
+    public static final int VERSION_CURRENT = VERSION_96;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java
index bffa6ce..e1aac2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java
@@ -19,22 +19,16 @@ package org.apache.doris.common.proc;
 
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.load.DeleteHandler;
+import org.apache.doris.load.Load;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.doris.load.Load;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteInfoProcDir implements ProcDirInterface {
+public class DeleteInfoProcDir implements ProcNodeInterface {
 
     public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
-            .add("jobId").add("TableId").add("TableName").add("PartitionId")
-            .add("PartitionName").add("CreateTime").add("DeleteCondition").add("Version")
-            .add("VersionHash").add("State")
-            .build();
-
-    public static final ImmutableList<String> TITLE_NAMES_FOR_USER = new ImmutableList.Builder<String>()
             .add("TableName").add("PartitionName").add("CreateTime").add("DeleteCondition")
             .add("State")
             .build();
@@ -54,8 +48,7 @@ public class DeleteInfoProcDir implements ProcDirInterface {
         BaseProcResult result = new BaseProcResult();
         result.setNames(TITLE_NAMES);
 
-        List<List<Comparable>> infos = deleteHandler.getDeleteInfosByDb(dbId, false);
-        infos.addAll(load.getDeleteInfosByDb(dbId, false));
+        List<List<Comparable>> infos = deleteHandler.getDeleteInfosByDb(dbId);
         for (List<Comparable> info : infos) {
             List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
             for (Comparable element : info) {
@@ -65,24 +58,4 @@ public class DeleteInfoProcDir implements ProcDirInterface {
         }
         return result;
     }
-
-    @Override
-    public boolean register(String name, ProcNodeInterface node) {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-    @Override
-    public ProcNodeInterface lookup(String jobIdStr) throws AnalysisException {
-        long jobId = -1L;
-        try {
-            jobId = Long.valueOf(jobIdStr);
-        } catch (NumberFormatException e) {
-            throw new AnalysisException("Invalid job id format: " + jobIdStr);
-        }
-
-        // return new DeleteJobProcNode(load, jobId);
-        return null;
-    }
-
 }
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DeleteJobProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DeleteJobProcNode.java
deleted file mode 100644
index aa9521c..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DeleteJobProcNode.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.proc;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.load.Load;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DeleteJobProcNode implements ProcNodeInterface {
-    public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
-            .add("TabletId")
-            .build();
-
-    private long jobId;
-    private Load load;
-
-    public DeleteJobProcNode(Load load, long jobId) {
-        this.jobId = jobId;
-        this.load = load;
-    }
-
-    @Override
-    public ProcResult fetchResult() throws AnalysisException {
-        BaseProcResult result = new BaseProcResult();
-        result.setNames(TITLE_NAMES);
-
-        List<List<Comparable>> infos = load.getAsyncDeleteJobInfo(jobId);
-        for (List<Comparable> info : infos) {
-            List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
-            for (Comparable element : info) {
-                oneInfo.add(element.toString());
-            }
-            result.addRow(oneInfo);
-        }
-        return result;
-    }
-}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
index e35e1f2..85e3b08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
@@ -109,10 +109,11 @@ public class JobsProcDir implements ProcDirInterface {
                                          cancelledNum.toString(), totalNum.toString()));
 
         // delete
+        // TODO: find it from delete handler
         pendingNum = 0L;
-        runningNum = load.getDeleteJobNumByState(dbId, org.apache.doris.load.LoadJob.JobState.LOADING);
-        finishedNum = load.getDeleteJobNumByState(dbId, org.apache.doris.load.LoadJob.JobState.FINISHED);
-        cancelledNum = load.getDeleteJobNumByState(dbId, org.apache.doris.load.LoadJob.JobState.CANCELLED);
+        runningNum = 0L;
+        finishedNum = 0L;
+        cancelledNum = 0L;
         totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
         result.addRow(Lists.newArrayList(DELETE, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
                                          cancelledNum.toString(), totalNum.toString()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 8ecc4da..cec1812 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -36,7 +36,6 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.journal.bdbje.Timestamp;
-import org.apache.doris.load.AsyncDeleteJob;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportJob;
 import org.apache.doris.load.LoadErrorHub;
@@ -295,20 +294,8 @@ public class JournalEntity implements Writable {
                 ((ExportJob.StateTransfer) data).readFields(in);
                 isRead = true;
                 break;
-            case OperationType.OP_FINISH_SYNC_DELETE: {
-                data = new DeleteInfo();
-                ((DeleteInfo) data).readFields(in);
-                isRead = true;
-                break;
-            }
             case OperationType.OP_FINISH_DELETE: {
-                data = new DeleteInfo();
-                ((DeleteInfo) data).readFields(in);
-                isRead = true;
-                break;
-            }
-            case OperationType.OP_FINISH_ASYNC_DELETE: {
-                data = AsyncDeleteJob.read(in);
+                data = DeleteInfo.read(in);
                 isRead = true;
                 break;
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
index b6741b2..45e4af9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
@@ -24,7 +24,6 @@ import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.Timestamp;
-import org.apache.doris.load.AsyncDeleteJob;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
@@ -328,23 +327,11 @@ public final class LocalJournalCursor implements JournalCursor {
                 ret.setData(job);
                 break;
             }
-            case OperationType.OP_FINISH_SYNC_DELETE: {
-                DeleteInfo info = new DeleteInfo();
-                info.readFields(in);
-                ret.setData(info);
-                break;
-            }
             case OperationType.OP_FINISH_DELETE: {
-                DeleteInfo info = new DeleteInfo();
-                info.readFields(in);
+                DeleteInfo info = DeleteInfo.read(in);
                 ret.setData(info);
                 break;
             }
-            case OperationType.OP_FINISH_ASYNC_DELETE: {
-                AsyncDeleteJob deleteJob = AsyncDeleteJob.read(in);
-                ret.setData(deleteJob);
-                break;
-            }
             case OperationType.OP_ADD_REPLICA:
             case OperationType.OP_DELETE_REPLICA: {
                 ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/AsyncDeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/AsyncDeleteJob.java
deleted file mode 100644
index ede4c85..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/load/AsyncDeleteJob.java
+++ /dev/null
@@ -1,276 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.load;
-
-import org.apache.doris.analysis.BinaryPredicate;
-import org.apache.doris.analysis.BinaryPredicate.Operator;
-import org.apache.doris.analysis.IsNullPredicate;
-import org.apache.doris.analysis.LiteralExpr;
-import org.apache.doris.analysis.Predicate;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.StringLiteral;
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.persist.ReplicaPersistInfo;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.PushTask;
-import org.apache.doris.thrift.TTaskType;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class AsyncDeleteJob implements Writable {
-
-    public enum DeleteState {
-        QUORUM_FINISHED,
-        FINISHED
-    }
-
-    private volatile DeleteState state;
-
-    private long jobId;
-    private long dbId;
-    private long tableId;
-    private long partitionId;
-    
-    private long transactionId;
-
-    private long partitionVersion;
-    private long partitionVersionHash;
-    private List<Predicate> conditions;
-
-    private Set<Long> tabletIds;
-    private Map<Long, PushTask> sendReplicaIdToPushTask;
-
-    private Map<Long, ReplicaPersistInfo> replicaPersistInfos;
-
-    private AsyncDeleteJob() {
-        // for persist
-        conditions = Lists.newArrayList();
-        tabletIds = Sets.newHashSet();
-        sendReplicaIdToPushTask = Maps.newHashMap();
-        replicaPersistInfos = Maps.newHashMap();
-        transactionId = -1;
-    }
-
-    public AsyncDeleteJob(long dbId, long tableId, long partitionId,
-                          long partitionVersion, long partitionVersionHash,
-                          List<Predicate> conditions) {
-        this.state = DeleteState.QUORUM_FINISHED;
-
-        this.jobId = Catalog.getCurrentCatalog().getNextId();
-        this.dbId = dbId;
-        this.tableId = tableId;
-        this.partitionId = partitionId;
-
-        this.partitionVersion = partitionVersion;
-        this.partitionVersionHash = partitionVersionHash;
-        this.conditions = conditions;
-        this.tabletIds = Sets.newHashSet();
-        this.sendReplicaIdToPushTask = Maps.newHashMap();
-        this.replicaPersistInfos = Maps.newHashMap();
-    }
-
-    public void setState(DeleteState state) {
-        this.state = state;
-    }
-
-    public DeleteState getState() {
-        return this.state;
-    }
-
-    public long getJobId() {
-        return jobId;
-    }
-
-    public long getDbId() {
-        return dbId;
-    }
-
-    public long getTableId() {
-        return tableId;
-    }
-
-    public long getPartitionId() {
-        return partitionId;
-    }
-
-    public Set<Long> getTabletIds() {
-        return tabletIds;
-    }
-
-    public long getPartitionVersion() {
-        return partitionVersion;
-    }
-
-    public long getPartitionVersionHash() {
-        return partitionVersionHash;
-    }
-
-    public List<Predicate> getConditions() {
-        return conditions;
-    }
-
-    public void addTabletId(long tabletId) {
-        this.tabletIds.add(tabletId);
-    }
-
-    public void setIsSend(long replicaId, PushTask task) {
-        sendReplicaIdToPushTask.put(replicaId, task);
-    }
-
-    public boolean hasSend(long replicaId) {
-        return sendReplicaIdToPushTask.containsKey(replicaId);
-    }
-
-    public void clearTasks() {
-        for (PushTask task : sendReplicaIdToPushTask.values()) {
-            AgentTaskQueue.removePushTask(task.getBackendId(), task.getSignature(),
-                                          task.getVersion(), task.getVersionHash(), 
-                                          task.getPushType(), TTaskType.PUSH);
-        }
-    }
-
-    public void addReplicaPersistInfos(ReplicaPersistInfo info) {
-        if (!replicaPersistInfos.containsKey(info.getReplicaId())) {
-            replicaPersistInfos.put(info.getReplicaId(), info);
-        }
-    }
-
-    public Map<Long, ReplicaPersistInfo> getReplicaPersistInfos() {
-        return replicaPersistInfos;
-    }
-
-    public static AsyncDeleteJob read(DataInput in) throws IOException {
-        AsyncDeleteJob job = new AsyncDeleteJob();
-        job.readFields(in);
-        return job;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        Text.writeString(out, state.name());
-        out.writeLong(jobId);
-        out.writeLong(dbId);
-        out.writeLong(tableId);
-        out.writeLong(partitionId);
-
-        out.writeLong(partitionVersion);
-        out.writeLong(partitionVersionHash);
-
-        int count = conditions.size();
-        out.writeInt(count);
-        for (Predicate predicate : conditions) {
-            if (predicate instanceof BinaryPredicate) {
-                BinaryPredicate binaryPredicate = (BinaryPredicate) predicate;
-                SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
-                String columnName = slotRef.getColumnName();
-                Text.writeString(out, columnName);
-                Text.writeString(out, binaryPredicate.getOp().name());
-                String value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue();
-                Text.writeString(out, value);
-            } else if (predicate instanceof IsNullPredicate) {
-                IsNullPredicate isNullPredicate = (IsNullPredicate) predicate;
-                SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
-                String columnName = slotRef.getColumnName();
-                Text.writeString(out, columnName);
-                Text.writeString(out, "IS");
-                String value = null;
-                if (isNullPredicate.isNotNull()) {
-                    value = "NOT NULL";
-                } else {
-                    value = "NULL";
-                }
-                Text.writeString(out, value);
-            }
-        }
-
-        count = tabletIds.size();
-        out.writeInt(count);
-        for (Long tabletId : tabletIds) {
-            out.writeLong(tabletId);
-        }
-
-        if (replicaPersistInfos == null) {
-            out.writeBoolean(false);
-        } else {
-            out.writeBoolean(true);
-            count = replicaPersistInfos.size();
-            out.writeInt(count);
-            for (ReplicaPersistInfo info : replicaPersistInfos.values()) {
-                info.write(out);
-            }
-        }
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        state = DeleteState.valueOf(Text.readString(in));
-        jobId = in.readLong();
-        dbId = in.readLong();
-        tableId = in.readLong();
-        partitionId = in.readLong();
-
-        partitionVersion = in.readLong();
-        partitionVersionHash = in.readLong();
-
-        int count = in.readInt();
-        for (int i = 0; i < count; i++) {
-            String key = Text.readString(in);
-            String opStr = Text.readString(in);
-            if (opStr.equals("IS")) {
-                String value = Text.readString(in);
-                IsNullPredicate predicate;
-                if (value.equals("NOT NULL")) {
-                    predicate = new IsNullPredicate(new SlotRef(null, key), true);
-                } else {
-                    predicate = new IsNullPredicate(new SlotRef(null, key), false);
-                }
-                conditions.add(predicate);
-            } else {
-                Operator op = Operator.valueOf(opStr);
-                String value = Text.readString(in);
-                BinaryPredicate predicate = new BinaryPredicate(op, new SlotRef(null, key), new StringLiteral(value));
-                conditions.add(predicate);
-            }
-        }
-
-        count = in.readInt();
-        for (int i = 0; i < count; i++) {
-            long tabletId = in.readLong();
-            tabletIds.add(tabletId);
-        }
-
-        if (in.readBoolean()) {
-            count = in.readInt();
-            replicaPersistInfos = Maps.newHashMap();
-            for (int i = 0; i < count; ++i) {
-                ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
-                replicaPersistInfos.put(info.getReplicaId(), info);
-            }
-        }
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index 0cd4733..4542ec2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -74,15 +74,15 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -94,6 +94,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 public class DeleteHandler implements Writable {
     private static final Logger LOG = LogManager.getLogger(DeleteHandler.class);
@@ -120,7 +121,8 @@ public class DeleteHandler implements Writable {
     public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
         String dbName = stmt.getDbName();
         String tableName = stmt.getTableName();
-        String partitionName = stmt.getPartitionName();
+        List<String> partitionNames = stmt.getPartitionNames();
+        boolean noPartitionSpecified = partitionNames.isEmpty();
         List<Predicate> conditions = stmt.getDeleteConditions();
         Database db = Catalog.getCurrentCatalog().getDb(dbName);
         if (db == null) {
@@ -146,25 +148,37 @@ public class DeleteHandler implements Writable {
                     throw new DdlException("Table's state is not normal: " + tableName);
                 }
 
-                if (partitionName == null) {
+                if (noPartitionSpecified) {
                     if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE) {
-                        throw new DdlException("This is a range partitioned table."
-                                + " You should specify partition in delete stmt");
-                    } else {
+                        if (!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) {
+                            throw new DdlException("This is a range partitioned table."
+                                    + " You should specify partition in delete stmt, or set delete_without_partition to true");
+                        } else {
+                            partitionNames.addAll(olapTable.getPartitionNames());
+                        }
+                    } else if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
                         // this is a unpartitioned table, use table name as partition name
-                        partitionName = olapTable.getName();
+                        partitionNames.add(olapTable.getName());
+                    } else {
+                        throw new DdlException("Unknown partition type: " + olapTable.getPartitionInfo().getType());
                     }
                 }
 
-                Partition partition = olapTable.getPartition(partitionName);
-                if (partition == null) {
-                    throw new DdlException("Partition does not exist. name: " + partitionName);
+                Map<Long, Short> partitionReplicaNum = Maps.newHashMap();
+                List<Partition> partitions = Lists.newArrayList();
+                for (String partName : partitionNames) {
+                    Partition partition = olapTable.getPartition(partName);
+                    if (partition == null) {
+                        throw new DdlException("Partition does not exist. name: " + partName);
+                    }
+                    partitions.add(partition);
+                    partitionReplicaNum.put(partition.getId(), ((OlapTable) table).getPartitionInfo().getReplicationNum(partition.getId()));
                 }
 
                 List<String> deleteConditions = Lists.newArrayList();
 
                 // pre check
-                checkDeleteV2(olapTable, partition, conditions, deleteConditions);
+                checkDeleteV2(olapTable, partitions, conditions, deleteConditions);
 
                 // generate label
                 String label = "delete_" + UUID.randomUUID();
@@ -176,10 +190,10 @@ public class DeleteHandler implements Writable {
                         new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
                         TransactionState.LoadJobSourceType.FRONTEND, jobId, Config.stream_load_default_timeout_second);
 
-                DeleteInfo deleteInfo = new DeleteInfo(db.getId(), olapTable.getId(), tableName,
-                        partition.getId(), partitionName,
-                        -1, 0, deleteConditions);
-                deleteJob = new DeleteJob(jobId, transactionId, label, deleteInfo);
+
+                DeleteInfo deleteInfo = new DeleteInfo(db.getId(), olapTable.getId(), tableName, deleteConditions);
+                deleteInfo.setPartitions(noPartitionSpecified, partitions.stream().map(p -> p.getId()).collect(Collectors.toList()), partitionNames);
+                deleteJob = new DeleteJob(jobId, transactionId, label, partitionReplicaNum, deleteInfo);
                 idToDeleteJob.put(deleteJob.getTransactionId(), deleteJob);
 
                 Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(deleteJob);
@@ -187,46 +201,50 @@ public class DeleteHandler implements Writable {
                 AgentBatchTask batchTask = new AgentBatchTask();
                 // count total replica num
                 int totalReplicaNum = 0;
-                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                    for (Tablet tablet : index.getTablets()) {
-                        totalReplicaNum += tablet.getReplicas().size();
+                for (Partition partition : partitions) {
+                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                        for (Tablet tablet : index.getTablets()) {
+                            totalReplicaNum += tablet.getReplicas().size();
+                        }
                     }
                 }
                 countDownLatch = new MarkedCountDownLatch<Long, Long>(totalReplicaNum);
 
-                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                    long indexId = index.getId();
-                    int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
-
-                    for (Tablet tablet : index.getTablets()) {
-                        long tabletId = tablet.getId();
-
-                        // set push type
-                        TPushType type = TPushType.DELETE;
-
-                        for (Replica replica : tablet.getReplicas()) {
-                            long replicaId = replica.getId();
-                            long backendId = replica.getBackendId();
-                            countDownLatch.addMark(backendId, tabletId);
-
-                            // create push task for each replica
-                            PushTask pushTask = new PushTask(null,
-                                    replica.getBackendId(), db.getId(), olapTable.getId(),
-                                    partition.getId(), indexId,
-                                    tabletId, replicaId, schemaHash,
-                                    -1, 0, "", -1, 0,
-                                    -1, type, conditions,
-                                    true, TPriority.NORMAL,
-                                    TTaskType.REALTIME_PUSH,
-                                    transactionId,
-                                    Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
-                            pushTask.setIsSchemaChanging(false);
-                            pushTask.setCountDownLatch(countDownLatch);
-
-                            if (AgentTaskQueue.addTask(pushTask)) {
-                                batchTask.addTask(pushTask);
-                                deleteJob.addPushTask(pushTask);
-                                deleteJob.addTablet(tabletId);
+                for (Partition partition : partitions) {
+                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                        long indexId = index.getId();
+                        int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+
+                        for (Tablet tablet : index.getTablets()) {
+                            long tabletId = tablet.getId();
+
+                            // set push type
+                            TPushType type = TPushType.DELETE;
+
+                            for (Replica replica : tablet.getReplicas()) {
+                                long replicaId = replica.getId();
+                                long backendId = replica.getBackendId();
+                                countDownLatch.addMark(backendId, tabletId);
+
+                                // create push task for each replica
+                                PushTask pushTask = new PushTask(null,
+                                        replica.getBackendId(), db.getId(), olapTable.getId(),
+                                        partition.getId(), indexId,
+                                        tabletId, replicaId, schemaHash,
+                                        -1, 0, "", -1, 0,
+                                        -1, type, conditions,
+                                        true, TPriority.NORMAL,
+                                        TTaskType.REALTIME_PUSH,
+                                        transactionId,
+                                        Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
+                                pushTask.setIsSchemaChanging(false);
+                                pushTask.setCountDownLatch(countDownLatch);
+
+                                if (AgentTaskQueue.addTask(pushTask)) {
+                                    batchTask.addTask(pushTask);
+                                    deleteJob.addPushTask(pushTask);
+                                    deleteJob.addTablet(tabletId);
+                                }
                             }
                         }
                     }
@@ -465,14 +483,16 @@ public class DeleteHandler implements Writable {
         return slotRef;
     }
 
-    private void checkDeleteV2(OlapTable table, Partition partition, List<Predicate> conditions, List<String> deleteConditions)
+    private void checkDeleteV2(OlapTable table, List<Partition> partitions, List<Predicate> conditions, List<String> deleteConditions)
             throws DdlException {
 
         // check partition state
-        Partition.PartitionState state = partition.getState();
-        if (state != Partition.PartitionState.NORMAL) {
-            // ErrorReport.reportDdlException(ErrorCode.ERR_BAD_PARTITION_STATE, partition.getName(), state.name());
-            throw new DdlException("Partition[" + partition.getName() + "]' state is not NORMAL: " + state.name());
+        for (Partition partition : partitions) {
+            Partition.PartitionState state = partition.getState();
+            if (state != Partition.PartitionState.NORMAL) {
+                // ErrorReport.reportDdlException(ErrorCode.ERR_BAD_PARTITION_STATE, partition.getName(), state.name());
+                throw new DdlException("Partition[" + partition.getName() + "]' state is not NORMAL: " + state.name());
+            }
         }
 
         // check condition column is key column and condition value
@@ -533,7 +553,11 @@ public class DeleteHandler implements Writable {
             // set schema column name
             slotRef.setCol(column.getName());
         }
+
+        // check materialized index.
+        // only need to check the first partition, because each partition has same materialized views
         Map<Long, List<Column>> indexIdToSchema = table.getIndexIdToSchema();
+        Partition partition = partitions.get(0);
         for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
             if (table.getBaseIndexId() == index.getId()) {
                 continue;
@@ -603,7 +627,7 @@ public class DeleteHandler implements Writable {
     }
 
     // show delete stmt
-    public List<List<Comparable>> getDeleteInfosByDb(long dbId, boolean forUser) {
+    public List<List<Comparable>> getDeleteInfosByDb(long dbId) {
         LinkedList<List<Comparable>> infos = new LinkedList<List<Comparable>>();
         Database db = Catalog.getCurrentCatalog().getDb(dbId);
         if (db == null) {
@@ -617,50 +641,29 @@ public class DeleteHandler implements Writable {
         }
 
         for (DeleteInfo deleteInfo : deleteInfos) {
-
             if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName,
                     deleteInfo.getTableName(),
                     PrivPredicate.LOAD)) {
                 continue;
             }
 
-
             List<Comparable> info = Lists.newArrayList();
-            if (!forUser) {
-                info.add(-1L);
-                info.add(deleteInfo.getTableId());
-            }
             info.add(deleteInfo.getTableName());
-            if (!forUser) {
-                info.add(deleteInfo.getPartitionId());
+            if (deleteInfo.isNoPartitionSpecified()) {
+                info.add("*");
+            } else {
+                info.add(Joiner.on(", ").join(deleteInfo.getPartitionNames()));
             }
-            info.add(deleteInfo.getPartitionName());
 
             info.add(TimeUtils.longToTimeString(deleteInfo.getCreateTimeMs()));
             String conds = Joiner.on(", ").join(deleteInfo.getDeleteConditions());
             info.add(conds);
 
-            if (!forUser) {
-                info.add(deleteInfo.getPartitionVersion());
-                info.add(deleteInfo.getPartitionVersionHash());
-            }
-            // for loading state, should not display loading, show deleting instead
-//                if (loadJob.getState() == LoadJob.JobState.LOADING) {
-//                    info.add("DELETING");
-//                } else {
-//                    info.add(loadJob.getState().name());
-//                }
             info.add("FINISHED");
             infos.add(info);
         }
         // sort by createTimeMs
-        int sortIndex;
-        if (!forUser) {
-            sortIndex = 5;
-        } else {
-            sortIndex = 2;
-        }
-        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(sortIndex);
+        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(2);
         Collections.sort(infos, comparator);
         return infos;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
index 8e866c4..769a00d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java
@@ -17,23 +17,25 @@
 
 package org.apache.doris.load;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.gson.annotations.SerializedName;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
-import org.apache.doris.load.AsyncDeleteJob.DeleteState;
 import org.apache.doris.persist.ReplicaPersistInfo;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
 
-public class DeleteInfo implements Writable {
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class DeleteInfo implements Writable, GsonPostProcessable {
 
     @SerializedName(value = "dbId")
     private long dbId;
@@ -41,48 +43,36 @@ public class DeleteInfo implements Writable {
     private long tableId;
     @SerializedName(value = "tableName")
     private String tableName;
-    @SerializedName(value = "partitionId")
-    private long partitionId;
-    @SerializedName(value = "partitionName")
-    private String partitionName;
-    @SerializedName(value = "partitionVersion")
-    private long partitionVersion;
-    @SerializedName(value = "partitionVersionHash")
-    private long partitionVersionHash;
-    private List<ReplicaPersistInfo> replicaInfos;
-
     @SerializedName(value = "deleteConditions")
     private List<String> deleteConditions;
     @SerializedName(value = "createTimeMs")
     private long createTimeMs;
-
-    private AsyncDeleteJob asyncDeleteJob;
+    @SerializedName(value = "partitionIds")
+    private List<Long> partitionIds;
+    @SerializedName(value = "partitionNames")
+    private List<String> partitionNames;
+    @SerializedName(value = "noPartitionSpecified")
+    private boolean noPartitionSpecified = false;
+
+    // The following partition id and partition name are deprecated.
+    // Leave them here just for compatibility
+    @Deprecated
+    @SerializedName(value = "partitionId")
+    private long partitionId;
+    @Deprecated
+    @SerializedName(value = "partitionName")
+    private String partitionName;
 
     public DeleteInfo() {
-        this.replicaInfos = new ArrayList<ReplicaPersistInfo>();
         this.deleteConditions = Lists.newArrayList();
-        this.asyncDeleteJob = null;
     }
 
-    public DeleteInfo(long dbId, long tableId, String tableName, long partitionId, String partitionName,
-                      long partitionVersion, long partitionVersionHash, List<String> deleteConditions) {
+    public DeleteInfo(long dbId, long tableId, String tableName, List<String> deleteConditions) {
         this.dbId = dbId;
         this.tableId = tableId;
         this.tableName = tableName;
-        this.partitionId = partitionId;
-        this.partitionName = partitionName;
-        this.partitionVersion = partitionVersion;
-        this.partitionVersionHash = partitionVersionHash;
-        this.replicaInfos = new ArrayList<ReplicaPersistInfo>();
         this.deleteConditions = deleteConditions;
-
         this.createTimeMs = System.currentTimeMillis();
-
-        this.asyncDeleteJob = null;
-    }
-
-    public long getJobId() {
-        return this.asyncDeleteJob == null ? -1 : this.asyncDeleteJob.getJobId();
     }
 
     public long getDbId() {
@@ -97,34 +87,6 @@ public class DeleteInfo implements Writable {
         return tableName;
     }
 
-    public long getPartitionId() {
-        return partitionId;
-    }
-
-    public String getPartitionName() {
-        return partitionName;
-    }
-
-    public long getPartitionVersion() {
-        return partitionVersion;
-    }
-
-    public long getPartitionVersionHash() {
-        return partitionVersionHash;
-    }
-
-    public List<ReplicaPersistInfo> getReplicaPersistInfos() {
-        return this.replicaInfos;
-    }
-
-    public void addReplicaPersistInfo(ReplicaPersistInfo info) {
-        this.replicaInfos.add(info);
-    }
-
-    public void setDeleteConditions(List<String> deleteConditions) {
-        this.deleteConditions = deleteConditions;
-    }
-
     public List<String> getDeleteConditions() {
         return deleteConditions;
     }
@@ -133,59 +95,51 @@ public class DeleteInfo implements Writable {
         return createTimeMs;
     }
 
-    public AsyncDeleteJob getAsyncDeleteJob() {
-        return asyncDeleteJob;
+    public boolean isNoPartitionSpecified() {
+        return noPartitionSpecified;
     }
 
-    public void setAsyncDeleteJob(AsyncDeleteJob asyncDeleteJob) {
-        this.asyncDeleteJob = asyncDeleteJob;
+    public void setPartitions(boolean noPartitionSpecified, List<Long> partitionIds, List<String> partitionNames) {
+        this.noPartitionSpecified = noPartitionSpecified;
+        Preconditions.checkState(partitionIds.size() == partitionNames.size());
+        this.partitionIds = partitionIds;
+        this.partitionNames = partitionNames;
     }
 
-    public DeleteState getState() {
-        return asyncDeleteJob == null ? DeleteState.FINISHED : asyncDeleteJob.getState();
-    }
-    
-    public void updatePartitionVersionInfo(long newVersion, long newVersionHash) {
-        this.partitionVersion = newVersion;
-        this.partitionVersionHash = newVersionHash;
+    public List<Long> getPartitionIds() {
+        return partitionIds;
     }
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeLong(dbId);
-        out.writeLong(tableId);
-        out.writeLong(partitionId);
-        out.writeLong(partitionVersion);
-        out.writeLong(partitionVersionHash);
-        out.writeInt(replicaInfos.size());
-        for (ReplicaPersistInfo info : replicaInfos) {
-            info.write(out);
-        }
-
-        Text.writeString(out, tableName);
-        Text.writeString(out, partitionName);
-
-        out.writeInt(deleteConditions.size());
-        for (String deleteCond : deleteConditions) {
-            Text.writeString(out, deleteCond);
-        }
-
-        out.writeLong(createTimeMs);
+    public List<String> getPartitionNames() {
+        return partitionNames;
+    }
 
-        if (asyncDeleteJob == null) {
-            out.writeBoolean(false);
+    public static DeleteInfo read(DataInput in) throws IOException {
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_96) {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, DeleteInfo.class);
         } else {
-            out.writeBoolean(true);
-            asyncDeleteJob.write(out);
+            DeleteInfo deleteInfo = new DeleteInfo();
+            deleteInfo.readFields(in);
+            return deleteInfo;
         }
     }
 
-    public void readFields(DataInput in) throws IOException {
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    private void readFields(DataInput in) throws IOException {
         dbId = in.readLong();
         tableId = in.readLong();
-        partitionId = in.readLong();
-        partitionVersion = in.readLong();
-        partitionVersionHash = in.readLong();
+        long partitionId = in.readLong();
+        long partitionVersion = in.readLong();
+        long partitionVersionHash = in.readLong();
+        this.partitionIds = Lists.newArrayList(partitionId);
+
+        List<ReplicaPersistInfo> replicaInfos = Lists.newArrayList();
         int size = in.readInt();
         for (int i = 0; i < size; i++) {
             ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
@@ -194,7 +148,7 @@ public class DeleteInfo implements Writable {
 
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_11) {
             tableName = Text.readString(in);
-            partitionName = Text.readString(in);
+            String partitionName = Text.readString(in);
 
             size = in.readInt();
             for (int i = 0; i < size; i++) {
@@ -203,12 +157,22 @@ public class DeleteInfo implements Writable {
             }
 
             createTimeMs = in.readLong();
+            this.partitionNames = Lists.newArrayList(partitionName);
         }
 
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_19) {
-            if (in.readBoolean()) {
-                asyncDeleteJob = AsyncDeleteJob.read(in);
-            }
+            boolean hasAsyncDeleteJob = in.readBoolean();
+            Preconditions.checkState(!hasAsyncDeleteJob, "async delete job is deprecated");
+        }
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        // This logic is just for forward compatibility
+        if (this.partitionId > 0) {
+            Preconditions.checkState(!Strings.isNullOrEmpty(this.partitionName));
+            this.partitionIds = Lists.newArrayList(this.partitionId);
+            this.partitionNames = Lists.newArrayList(this.partitionName);
         }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index 90c946c..c94980b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -19,7 +19,6 @@ package org.apache.doris.load;
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
@@ -29,12 +28,12 @@ import org.apache.doris.task.PushTask;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
 import org.apache.doris.transaction.TransactionState;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
@@ -62,7 +61,10 @@ public class DeleteJob extends AbstractTxnStateChangeCallback {
     private Set<PushTask> pushTasks;
     private DeleteInfo deleteInfo;
 
-    public DeleteJob(long id, long transactionId, String label, DeleteInfo deleteInfo) {
+    private Map<Long, Short> partitionReplicaNum;
+
+    public DeleteJob(long id, long transactionId, String label,
+                     Map<Long, Short> partitionReplicaNum, DeleteInfo deleteInfo) {
         this.id = id;
         this.signature = transactionId;
         this.label = label;
@@ -73,6 +75,7 @@ public class DeleteJob extends AbstractTxnStateChangeCallback {
         tabletDeleteInfoMap = Maps.newConcurrentMap();
         pushTasks = Sets.newHashSet();
         state = DeleteState.UN_QUORUM;
+        this.partitionReplicaNum = partitionReplicaNum;
     }
 
     /**
@@ -84,30 +87,21 @@ public class DeleteJob extends AbstractTxnStateChangeCallback {
     public void checkAndUpdateQuorum() throws MetaNotFoundException {
         long dbId = deleteInfo.getDbId();
         long tableId = deleteInfo.getTableId();
-        long partitionId = deleteInfo.getPartitionId();
         Database db = Catalog.getCurrentCatalog().getDb(dbId);
         if (db == null) {
             throw new MetaNotFoundException("can not find database "+ dbId +" when commit delete");
         }
 
-        short replicaNum = -1;
-        OlapTable table = (OlapTable) db.getTable(tableId);
-        if (table == null) {
-            throw new MetaNotFoundException("can not find table "+ tableId +" when commit delete");
-        }
-        table.readLock();
-        try {
-            replicaNum = table.getPartitionInfo().getReplicationNum(partitionId);
-        } finally {
-            table.readUnlock();
-        }
-
-        short quorumNum = (short) (replicaNum / 2 + 1);
         for (TabletDeleteInfo tDeleteInfo : getTabletDeleteInfo()) {
+            Short replicaNum = partitionReplicaNum.get(tDeleteInfo.getPartitionId());
+            if (replicaNum == null) {
+                // should not happen
+                throw new MetaNotFoundException("Unknown partition " + tDeleteInfo.getPartitionId() + " when commit delete job");
+            }
             if (tDeleteInfo.getFinishedReplicas().size() == replicaNum) {
                 finishedTablets.add(tDeleteInfo.getTabletId());
             }
-            if (tDeleteInfo.getFinishedReplicas().size() >= quorumNum) {
+            if (tDeleteInfo.getFinishedReplicas().size() >= replicaNum / 2 + 1) {
                 quorumTablets.add(tDeleteInfo.getTabletId());
             }
         }
@@ -137,8 +131,8 @@ public class DeleteJob extends AbstractTxnStateChangeCallback {
         return pushTasks.add(pushTask);
     }
 
-    public boolean addFinishedReplica(long tabletId, Replica replica) {
-        tabletDeleteInfoMap.putIfAbsent(tabletId, new TabletDeleteInfo(tabletId));
+    public boolean addFinishedReplica(long partitionId, long tabletId, Replica replica) {
+        tabletDeleteInfoMap.putIfAbsent(tabletId, new TabletDeleteInfo(partitionId, tabletId));
         TabletDeleteInfo tDeleteInfo =  tabletDeleteInfoMap.get(tabletId);
         return tDeleteInfo.addFinishedReplica(replica);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index cdeef41..9c4104f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -24,7 +24,6 @@ import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.CastExpr;
 import org.apache.doris.analysis.ColumnSeparator;
 import org.apache.doris.analysis.DataDescription;
-import org.apache.doris.analysis.DeleteStmt;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.FunctionCallExpr;
@@ -33,11 +32,9 @@ import org.apache.doris.analysis.FunctionParams;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.IsNullPredicate;
 import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.Predicate;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
@@ -55,7 +52,6 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Partition.PartitionState;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Replica;
@@ -73,7 +69,6 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
@@ -83,14 +78,12 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.AsyncDeleteJob.DeleteState;
 import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.LoadJob.JobState;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.task.AgentClient;
 import org.apache.doris.task.AgentTaskQueue;
@@ -100,21 +93,13 @@ import org.apache.doris.thrift.TEtlState;
 import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPriority;
-import org.apache.doris.transaction.PartitionCommitInfo;
-import org.apache.doris.transaction.TableCommitInfo;
 import org.apache.doris.transaction.TransactionNotFoundException;
-import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
-import org.apache.doris.transaction.TransactionState.TxnCoordinator;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
-import org.apache.doris.transaction.TransactionStatus;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -133,7 +118,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -160,13 +144,6 @@ public class Load {
     // dbId -> set of (label, timestamp)
     private Map<Long, Map<String, Long>> dbToMiniLabels; // db to mini uncommitted label
 
-
-    private Map<Long, List<DeleteInfo>> dbToDeleteInfos; // db to delete job list
-    private Map<Long, List<LoadJob>> dbToDeleteJobs; // db to delete loadJob list
-
-    private Set<Long> partitionUnderDelete; // save partitions which are running delete jobs
-    private Map<Long, AsyncDeleteJob> idToQuorumFinishedDeleteJob;
-
     private volatile LoadErrorHub.Param loadErrorHubParam = new LoadErrorHub.Param();
 
     // lock for load job
@@ -231,10 +208,6 @@ public class Load {
         idToQuorumFinishedLoadJob = Maps.newLinkedHashMap();
         loadingPartitionIds = Sets.newHashSet();
         dbToMiniLabels = Maps.newHashMap();
-        dbToDeleteInfos = Maps.newHashMap();
-        dbToDeleteJobs = Maps.newHashMap();
-        partitionUnderDelete = Sets.newHashSet();
-        idToQuorumFinishedDeleteJob = Maps.newLinkedHashMap();
         lock = new ReentrantReadWriteLock(true);
     }
 
@@ -1334,56 +1307,44 @@ public class Load {
                     "Number of unfinished load jobs exceed the max number: " + Config.max_unfinished_load_job);
         }
 
-        if (!job.isSyncDeleteJob()) {
-            // check label exist
-            boolean checkMini = true;
-            if (job.getEtlJobType() == EtlJobType.MINI) {
-                // already registered, do not need check
-                checkMini = false;
-            }
+        Preconditions.checkState(!job.isSyncDeleteJob(), "delete job is deprecated");
+        // check label exist
+        boolean checkMini = true;
+        if (job.getEtlJobType() == EtlJobType.MINI) {
+            // already registered, do not need check
+            checkMini = false;
+        }
 
-            unprotectIsLabelUsed(dbId, label, -1, checkMini);
+        unprotectIsLabelUsed(dbId, label, -1, checkMini);
 
-            // add job
-            Map<String, List<LoadJob>> labelToLoadJobs = null;
-            if (dbLabelToLoadJobs.containsKey(dbId)) {
-                labelToLoadJobs = dbLabelToLoadJobs.get(dbId);
-            } else {
-                labelToLoadJobs = Maps.newHashMap();
-                dbLabelToLoadJobs.put(dbId, labelToLoadJobs);
-            }
-            List<LoadJob> labelLoadJobs = null;
-            if (labelToLoadJobs.containsKey(label)) {
-                labelLoadJobs = labelToLoadJobs.get(label);
-            } else {
-                labelLoadJobs = Lists.newArrayList();
-                labelToLoadJobs.put(label, labelLoadJobs);
-            }
+        // add job
+        Map<String, List<LoadJob>> labelToLoadJobs = null;
+        if (dbLabelToLoadJobs.containsKey(dbId)) {
+            labelToLoadJobs = dbLabelToLoadJobs.get(dbId);
+        } else {
+            labelToLoadJobs = Maps.newHashMap();
+            dbLabelToLoadJobs.put(dbId, labelToLoadJobs);
+        }
+        List<LoadJob> labelLoadJobs = null;
+        if (labelToLoadJobs.containsKey(label)) {
+            labelLoadJobs = labelToLoadJobs.get(label);
+        } else {
+            labelLoadJobs = Lists.newArrayList();
+            labelToLoadJobs.put(label, labelLoadJobs);
+        }
 
-            List<LoadJob> dbLoadJobs = null;
-            if (dbToLoadJobs.containsKey(dbId)) {
-                dbLoadJobs = dbToLoadJobs.get(dbId);
-            } else {
-                dbLoadJobs = Lists.newArrayList();
-                dbToLoadJobs.put(dbId, dbLoadJobs);
-            }
-            idToLoadJob.put(jobId, job);
-            dbLoadJobs.add(job);
-            labelLoadJobs.add(job);
+        List<LoadJob> dbLoadJobs = null;
+        if (dbToLoadJobs.containsKey(dbId)) {
+            dbLoadJobs = dbToLoadJobs.get(dbId);
         } else {
-            List<LoadJob> dbDeleteJobs = null;
-            if (dbToDeleteJobs.containsKey(dbId)) {
-                dbDeleteJobs = dbToDeleteJobs.get(dbId);
-            } else {
-                dbDeleteJobs = Lists.newArrayList();
-                dbToDeleteJobs.put(dbId, dbDeleteJobs);
-            }
-            idToLoadJob.put(jobId, job);
-            dbDeleteJobs.add(job);
+            dbLoadJobs = Lists.newArrayList();
+            dbToLoadJobs.put(dbId, dbLoadJobs);
         }
+        idToLoadJob.put(jobId, job);
+        dbLoadJobs.add(job);
+        labelLoadJobs.add(job);
 
         // beginTransaction Here
-
         switch (job.getState()) {
             case PENDING:
                 idToPendingLoadJob.put(jobId, job);
@@ -1857,60 +1818,6 @@ public class Load {
         }
     }
 
-    public void removeDeleteJobAndSetState(AsyncDeleteJob job) {
-        job.clearTasks();
-        writeLock();
-        try {
-            idToQuorumFinishedDeleteJob.remove(job.getJobId());
-
-            List<DeleteInfo> deleteInfos = dbToDeleteInfos.get(job.getDbId());
-            Preconditions.checkNotNull(deleteInfos);
-
-            for (DeleteInfo deleteInfo : deleteInfos) {
-                if (deleteInfo.getJobId() == job.getJobId()) {
-                    deleteInfo.getAsyncDeleteJob().setState(DeleteState.FINISHED);
-                    LOG.info("replay set async delete job to finished: {}", job.getJobId());
-                }
-            }
-
-        } finally {
-            writeUnlock();
-        }
-    }
-
-    public List<AsyncDeleteJob> getQuorumFinishedDeleteJobs() {
-        List<AsyncDeleteJob> jobs = Lists.newArrayList();
-        Collection<AsyncDeleteJob> stateJobs = null;
-        readLock();
-        try {
-            stateJobs = idToQuorumFinishedDeleteJob.values();
-            if (stateJobs != null) {
-                jobs.addAll(stateJobs);
-            }
-        } finally {
-            readUnlock();
-        }
-        return jobs;
-    }
-
-    public int getLoadJobNumber() {
-        readLock();
-        try {
-            if (idToLoadJob == null) {
-                return 0;
-            }
-            int loadJobNum = 0;
-            for (LoadJob loadJob : idToLoadJob.values()) {
-                if (!loadJob.isSyncDeleteJob()) {
-                    ++loadJobNum;
-                }
-            }
-            return loadJobNum;
-        } finally {
-            readUnlock();
-        }
-    }
-
     public Map<Long, LoadJob> getIdToLoadJob() {
         return idToLoadJob;
     }
@@ -1919,14 +1826,6 @@ public class Load {
         return dbToLoadJobs;
     }
 
-    public Map<Long, List<LoadJob>> getDbToDeleteJobs() {
-        return dbToDeleteJobs;
-    }
-
-    public Map<Long, List<DeleteInfo>> getDbToDeleteInfos() {
-        return dbToDeleteInfos;
-    }
-
     public List<LoadJob> getLoadJobs(JobState jobState) {
         List<LoadJob> jobs = new ArrayList<LoadJob>();
         Collection<LoadJob> stateJobs = null;
@@ -1986,24 +1885,6 @@ public class Load {
         }
     }
 
-    public AsyncDeleteJob getAsyncDeleteJob(long jobId) {
-        readLock();
-        try {
-            return idToQuorumFinishedDeleteJob.get(jobId);
-        } finally {
-            readUnlock();
-        }
-    }
-
-    public List<AsyncDeleteJob> getCopiedAsyncDeleteJobs() {
-        readLock();
-        try {
-            return Lists.newArrayList(idToQuorumFinishedDeleteJob.values());
-        } finally {
-            readUnlock();
-        }
-    }
-
     public LinkedList<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName, String labelValue,
                                                             boolean accurateMatch, Set<JobState> states) {
         LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
@@ -2622,78 +2503,56 @@ public class Load {
         }
         idToLoadJob.put(jobId, job);
 
-        if (!job.isSyncDeleteJob()) {
-            // Replace LoadJob in dbToLoadJobs
-            List<LoadJob> jobs = dbToLoadJobs.get(job.getDbId());
-            if (jobs == null) {
-                LOG.warn("Does not find db in dbToLoadJobs. DbId : {}",
-                         job.getDbId());
-                return;
-            }
-            int pos = 0;
-            for (LoadJob oneJob : jobs) {
-                if (oneJob.getId() == jobId) {
-                    break;
-                }
-                pos++;
-            }
-            if (pos == jobs.size()) {
-                LOG.warn("Does not find load job for db. DbId : {}, jobId : {}",
-                         job.getDbId(), jobId);
-                return;
-            }
-            jobs.remove(pos);
-            jobs.add(pos, job);
+        Preconditions.checkState(!job.isSyncDeleteJob(), "delete job is deprecated");
 
-            // Replace LoadJob in dbLabelToLoadJobs
-            if (dbLabelToLoadJobs.get(job.getDbId()) == null) {
-                LOG.warn("Does not find db in dbLabelToLoadJobs. DbId : {}",
-                         job.getDbId());
-                return;
-            }
-            jobs = dbLabelToLoadJobs.get(job.getDbId()).get(job.getLabel());
-            if (jobs == null) {
-                LOG.warn("Does not find label for db. label : {}, DbId : {}",
-                         job.getLabel(), job.getDbId());
-                return;
-            }
-            pos = 0;
-            for (LoadJob oneJob : jobs) {
-                if (oneJob.getId() == jobId) {
-                    break;
-                }
-                pos++;
-            }
-            if (pos == jobs.size()) {
-                LOG.warn("Does not find load job for label. label : {}, DbId : {}",
-                         job.getLabel(), job.getDbId());
-                return;
-            }
-            jobs.remove(pos);
-            jobs.add(pos, job);
-        } else {
-            // Replace LoadJob in dbToLoadJobs
-            List<LoadJob> jobs = dbToDeleteJobs.get(job.getDbId());
-            if (jobs == null) {
-                LOG.warn("Does not find db in dbToDeleteJobs. DbId : {}",
-                         job.getDbId());
-                return;
-            }
-            int pos = 0;
-            for (LoadJob oneJob : jobs) {
-                if (oneJob.getId() == jobId) {
-                    break;
-                }
-                pos++;
+        // Replace LoadJob in dbToLoadJobs
+        List<LoadJob> jobs = dbToLoadJobs.get(job.getDbId());
+        if (jobs == null) {
+            LOG.warn("Does not find db in dbToLoadJobs. DbId : {}",
+                    job.getDbId());
+            return;
+        }
+        int pos = 0;
+        for (LoadJob oneJob : jobs) {
+            if (oneJob.getId() == jobId) {
+                break;
             }
-            if (pos == jobs.size()) {
-                LOG.warn("Does not find delete load job for db. DbId : {}, jobId : {}",
-                         job.getDbId(), jobId);
-                return;
+            pos++;
+        }
+        if (pos == jobs.size()) {
+            LOG.warn("Does not find load job for db. DbId : {}, jobId : {}",
+                    job.getDbId(), jobId);
+            return;
+        }
+        jobs.remove(pos);
+        jobs.add(pos, job);
+
+        // Replace LoadJob in dbLabelToLoadJobs
+        if (dbLabelToLoadJobs.get(job.getDbId()) == null) {
+            LOG.warn("Does not find db in dbLabelToLoadJobs. DbId : {}",
+                    job.getDbId());
+            return;
+        }
+        jobs = dbLabelToLoadJobs.get(job.getDbId()).get(job.getLabel());
+        if (jobs == null) {
+            LOG.warn("Does not find label for db. label : {}, DbId : {}",
+                    job.getLabel(), job.getDbId());
+            return;
+        }
+        pos = 0;
+        for (LoadJob oneJob : jobs) {
+            if (oneJob.getId() == jobId) {
+                break;
             }
-            jobs.remove(pos);
-            jobs.add(pos, job);
+            pos++;
         }
+        if (pos == jobs.size()) {
+            LOG.warn("Does not find load job for label. label : {}, DbId : {}",
+                    job.getLabel(), job.getDbId());
+            return;
+        }
+        jobs.remove(pos);
+        jobs.add(pos, job);
     }
 
     // remove all db jobs from dbToLoadJobs and dbLabelToLoadJobs
@@ -2715,9 +2574,6 @@ public class Load {
             if (dbLabelToLoadJobs.containsKey(dbId)) {
                 dbLabelToLoadJobs.remove(dbId);
             }
-            if (dbToDeleteJobs.containsKey(dbId)) {
-                dbToDeleteJobs.remove(dbId);
-            }
         } finally {
             writeUnlock();
         }
@@ -2750,15 +2606,6 @@ public class Load {
                         }
                     }
 
-                    // remove delete job from dbToDeleteJobs
-                    List<LoadJob> deleteJobs = dbToDeleteJobs.get(dbId);
-                    if (deleteJobs != null) {
-                        deleteJobs.remove(job);
-                        if (deleteJobs.size() == 0) {
-                            dbToDeleteJobs.remove(dbId);
-                        }
-                    }
-
                     // Remove job from dbLabelToLoadJobs
                     Map<String, List<LoadJob>> mapLabelToJobs = dbLabelToLoadJobs.get(dbId);
                     if (mapLabelToJobs != null) {
@@ -2919,16 +2766,6 @@ public class Load {
                                 idToLoadingLoadJob.remove(jobId);
                                 job.setProgress(100);
                                 job.setLoadFinishTimeMs(System.currentTimeMillis());
-                                // if this is a sync delete job, then update affected version and version hash
-                                if (job.isSyncDeleteJob()) {
-                                    TransactionState transactionState = Catalog.getCurrentGlobalTransactionMgr()
-                                            .getTransactionState(job.getDbId(), job.getTransactionId());
-                                    DeleteInfo deleteInfo = job.getDeleteInfo();
-                                    TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(deleteInfo.getTableId());
-                                    PartitionCommitInfo partitionCommitInfo = tableCommitInfo.getPartitionCommitInfo(deleteInfo.getPartitionId());
-                                    deleteInfo.updatePartitionVersionInfo(partitionCommitInfo.getVersion(),
-                                            partitionCommitInfo.getVersionHash());
-                                }
                             }
                             MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
                             // job will transfer from LOADING to FINISHED, skip QUORUM_FINISHED
@@ -3222,668 +3059,4 @@ public class Load {
             readUnlock();
         }
     }
-
-    public void unprotectAddDeleteInfo(DeleteInfo deleteInfo) {
-        long dbId = deleteInfo.getDbId();
-        List<DeleteInfo> deleteInfos = dbToDeleteInfos.get(dbId);
-        if (deleteInfos == null) {
-            deleteInfos = Lists.newArrayList();
-            dbToDeleteInfos.put(dbId, deleteInfos);
-        }
-        deleteInfos.add(deleteInfo);
-
-        if (deleteInfo.getAsyncDeleteJob() != null && deleteInfo.getState() == DeleteState.QUORUM_FINISHED) {
-            AsyncDeleteJob asyncDeleteJob = deleteInfo.getAsyncDeleteJob();
-            idToQuorumFinishedDeleteJob.put(asyncDeleteJob.getJobId(), asyncDeleteJob);
-            LOG.info("unprotected add asyncDeleteJob when load image: {}", asyncDeleteJob.getJobId());
-        }
-    }
-
-    public void unprotectDelete(DeleteInfo deleteInfo, Database db) {
-        OlapTable table = (OlapTable) db.getTable(deleteInfo.getTableId());
-        Partition partition = table.getPartition(deleteInfo.getPartitionId());
-        updatePartitionVersion(partition, deleteInfo.getPartitionVersion(), deleteInfo.getPartitionVersionHash(), -1);
-
-        List<ReplicaPersistInfo> replicaInfos = deleteInfo.getReplicaPersistInfos();
-        if (replicaInfos != null) {
-            for (ReplicaPersistInfo info : replicaInfos) {
-                MaterializedIndex index = partition.getIndex(info.getIndexId());
-                Tablet tablet = index.getTablet(info.getTabletId());
-                Replica replica = tablet.getReplicaById(info.getReplicaId());
-                replica.updateVersionInfo(info.getVersion(), info.getVersionHash(),
-                                          info.getDataSize(), info.getRowCount());
-            }
-        }
-
-        // add to deleteInfos
-        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_11) {
-            long dbId = deleteInfo.getDbId();
-            List<DeleteInfo> deleteInfos = dbToDeleteInfos.get(dbId);
-            if (deleteInfos == null) {
-                deleteInfos = Lists.newArrayList();
-                dbToDeleteInfos.put(dbId, deleteInfos);
-            }
-            deleteInfos.add(deleteInfo);
-        }
-
-        if (deleteInfo.getAsyncDeleteJob() != null) {
-            AsyncDeleteJob asyncDeleteJob = deleteInfo.getAsyncDeleteJob();
-            idToQuorumFinishedDeleteJob.put(asyncDeleteJob.getJobId(), asyncDeleteJob);
-            LOG.info("unprotected add asyncDeleteJob: {}", asyncDeleteJob.getJobId());
-        }
-    }
-
-    public void replayFinishAsyncDeleteJob(AsyncDeleteJob deleteJob, Catalog catalog) {
-        Database db = catalog.getDb(deleteJob.getDbId());
-        OlapTable table = (OlapTable) db.getTable(deleteJob.getTableId());
-        table.writeLock();
-        readLock();
-        try {
-            // Update database information
-            Map<Long, ReplicaPersistInfo> replicaInfos = deleteJob.getReplicaPersistInfos();
-            if (replicaInfos != null) {
-                for (ReplicaPersistInfo info : replicaInfos.values()) {
-                        Partition partition = table.getPartition(info.getPartitionId());
-                        if (partition == null) {
-                            LOG.warn("the partition[{}] is missing", info.getIndexId());
-                            continue;
-                        }
-                        MaterializedIndex index = partition.getIndex(info.getIndexId());
-                        if (index == null) {
-                            LOG.warn("the index[{}] is missing", info.getIndexId());
-                            continue;
-                        }
-                        Tablet tablet = index.getTablet(info.getTabletId());
-                        if (tablet == null) {
-                            LOG.warn("the tablet[{}] is missing", info.getTabletId());
-                            continue;
-                        }
-
-                        Replica replica = tablet.getReplicaById(info.getReplicaId());
-                        if (replica == null) {
-                            LOG.warn("the replica[{}] is missing", info.getReplicaId());
-                            continue;
-                        }
-                        replica.updateVersionInfo(info.getVersion(), info.getVersionHash(),
-                                info.getDataSize(), info.getRowCount());
-                }
-            }
-        } finally {
-            readUnlock();
-            table.writeUnlock();
-        }
-        removeDeleteJobAndSetState(deleteJob);
-        LOG.info("unprotected finish asyncDeleteJob: {}", deleteJob.getJobId());
-    }
-
-    public void replayDelete(DeleteInfo deleteInfo, Catalog catalog) {
-        Database db = catalog.getDb(deleteInfo.getDbId());
-        OlapTable table = (OlapTable) db.getTable(deleteInfo.getTableId());
-        if (table == null) {
-            return;
-        }
-        table.writeLock();
-        writeLock();
-        try {
-            unprotectDelete(deleteInfo, db);
-        } finally {
-            writeUnlock();
-            table.writeUnlock();
-        }
-    }
-
-    private void checkDeleteV2(OlapTable table, Partition partition, List<Predicate> conditions, List<String> deleteConditions, boolean preCheck)
-            throws DdlException {
-
-        // check partition state
-        PartitionState state = partition.getState();
-        if (state != PartitionState.NORMAL) {
-            // ErrorReport.reportDdlException(ErrorCode.ERR_BAD_PARTITION_STATE, partition.getName(), state.name());
-            throw new DdlException("Partition[" + partition.getName() + "]' state is not NORMAL: " + state.name());
-        }
-        // do not need check whether partition has loading job
-
-        // async delete job does not exist any more
-
-        // check condition column is key column and condition value
-        Map<String, Column> nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-        for (Column column : table.getBaseSchema()) {
-            nameToColumn.put(column.getName(), column);
-        }
-        for (Predicate condition : conditions) {
-            SlotRef slotRef = null;
-            if (condition instanceof BinaryPredicate) {
-                BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
-                slotRef = (SlotRef) binaryPredicate.getChild(0);
-            } else if (condition instanceof IsNullPredicate) {
-                IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
-                slotRef = (SlotRef) isNullPredicate.getChild(0);
-            }
-            String columnName = slotRef.getColumnName();
-            if (!nameToColumn.containsKey(columnName)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName());
-            }
-
-            Column column = nameToColumn.get(columnName);
-            if (!column.isKey()) {
-                // ErrorReport.reportDdlException(ErrorCode.ERR_NOT_KEY_COLUMN, columnName);
-                throw new DdlException("Column[" + columnName + "] is not key column");
-            }
-
-            if (condition instanceof BinaryPredicate) {
-                String value = null;
-                try {
-                    BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
-                    value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue();
-                    LiteralExpr.create(value, Type.fromPrimitiveType(column.getDataType()));
-                } catch (AnalysisException e) {
-                    // ErrorReport.reportDdlException(ErrorCode.ERR_INVALID_VALUE, value);
-                    throw new DdlException("Invalid column value[" + value + "]");
-                }
-            }
-
-            // set schema column name
-            slotRef.setCol(column.getName());
-        }
-        Map<Long, List<Column>> indexIdToSchema = table.getIndexIdToSchema();
-        for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-            // check table has condition column
-            Map<String, Column> indexColNameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-            for (Column column : indexIdToSchema.get(index.getId())) {
-                indexColNameToColumn.put(column.getName(), column);
-            }
-            String indexName = table.getIndexNameById(index.getId());
-            for (Predicate condition : conditions) {
-                String columnName = null;
-                if (condition instanceof BinaryPredicate) {
-                    BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
-                    columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
-                } else if (condition instanceof IsNullPredicate) {
-                    IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
-                    columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
-                }
-                Column column = indexColNameToColumn.get(columnName);
-                if (column == null) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, indexName);
-                }
-
-                if (table.getKeysType() == KeysType.DUP_KEYS && !column.isKey()) {
-                    throw new DdlException("Column[" + columnName + "] is not key column in index[" + indexName + "]");
-                }
-            }
-
-            // do not need to check replica version and backend alive
-
-        } // end for indices
-
-        if (deleteConditions == null) {
-            return;
-        }
-
-        // save delete conditions
-        for (Predicate condition : conditions) {
-            if (condition instanceof BinaryPredicate) {
-                BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
-                SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
-                String columnName = slotRef.getColumnName();
-                StringBuilder sb = new StringBuilder();
-                sb.append(columnName).append(" ").append(binaryPredicate.getOp().name()).append(" \"")
-                        .append(((LiteralExpr) binaryPredicate.getChild(1)).getStringValue()).append("\"");
-                deleteConditions.add(sb.toString());
-            } else if (condition instanceof IsNullPredicate) {
-                IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
-                SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
-                String columnName = slotRef.getColumnName();
-                StringBuilder sb = new StringBuilder();
-                sb.append(columnName);
-                if (isNullPredicate.isNotNull()) {
-                    sb.append(" IS NOT NULL");
-                } else {
-                    sb.append(" IS NULL");
-                }
-                deleteConditions.add(sb.toString());
-            }
-        }
-    }
-
-    private boolean checkAndAddRunningSyncDeleteJob(long partitionId, String partitionName) throws DdlException {
-        // check if there are synchronized delete job under going
-        writeLock();
-        try {
-            checkHasRunningSyncDeleteJob(partitionId, partitionName);
-            return partitionUnderDelete.add(partitionId);
-        } finally {
-            writeUnlock();
-        }
-    }
-
-    private void checkHasRunningSyncDeleteJob(long partitionId, String partitionName) throws DdlException {
-        // check if there are synchronized delete job under going
-        readLock();
-        try {
-            if (partitionUnderDelete.contains(partitionId)) {
-                throw new DdlException("Partition[" + partitionName + "] has running delete job. See 'SHOW DELETE'");
-            }
-        } finally {
-            readUnlock();
-        }
-    }
-
-    private void checkHasRunningAsyncDeleteJob(long partitionId, String partitionName) throws DdlException {
-        readLock();
-        try {
-            for (AsyncDeleteJob job : idToQuorumFinishedDeleteJob.values()) {
-                if (job.getPartitionId() == partitionId) {
-                    throw new DdlException("Partition[" + partitionName + "] has running async delete job. "
-                                                   + "See 'SHOW DELETE'");
-                }
-            }
-            for (long dbId : dbToDeleteJobs.keySet()) {
-                List<LoadJob> loadJobs = dbToDeleteJobs.get(dbId);
-                for (LoadJob loadJob : loadJobs) {
-                    if (loadJob.getDeleteInfo().getPartitionId() == partitionId
-                            && loadJob.getState() == JobState.LOADING) {
-                        throw new DdlException("Partition[" + partitionName + "] has running async delete job. "
-                                                       + "See 'SHOW DELETE'");
-                    }
-                }
-            }
-        } finally {
-            readUnlock();
-        }
-    }
-
-    public void delete(DeleteStmt stmt) throws DdlException {
-        String dbName = stmt.getDbName();
-        String tableName = stmt.getTableName();
-        String partitionName = stmt.getPartitionName();
-        List<Predicate> conditions = stmt.getDeleteConditions();
-        Database db = Catalog.getCurrentCatalog().getDb(dbName);
-        if (db == null) {
-            throw new DdlException("Db does not exist. name: " + dbName);
-        }
-
-        long tableId = -1;
-        long partitionId = -1;
-        LoadJob loadDeleteJob = null;
-        boolean addRunningPartition = false;
-        Table table = db.getTable(tableName);
-
-        if (table == null) {
-            throw new DdlException("Table does not exist. name: " + tableName);
-        }
-
-        if (table.getType() != TableType.OLAP) {
-            throw new DdlException("Not olap type table. type: " + table.getType().name());
-        }
-
-        table.readLock();
-        try {
-            OlapTable olapTable = (OlapTable) table;
-
-            if (olapTable.getState() != OlapTableState.NORMAL) {
-                throw new DdlException("Table's state is not normal: " + tableName);
-            }
-
-            tableId = olapTable.getId();
-            if (partitionName == null) {
-                if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE) {
-                    throw new DdlException("This is a range partitioned table."
-                            + " You should specify partition in delete stmt");
-                } else {
-                    // this is a unpartitioned table, use table name as partition name
-                    partitionName = olapTable.getName();
-                }
-            }
-
-            Partition partition = olapTable.getPartition(partitionName);
-            if (partition == null) {
-                throw new DdlException("Partition does not exist. name: " + partitionName);
-            }
-            partitionId = partition.getId();
-
-            List<String> deleteConditions = Lists.newArrayList();
-            // pre check
-            checkDeleteV2(olapTable, partition, conditions,
-                          deleteConditions, true);
-            addRunningPartition = checkAndAddRunningSyncDeleteJob(partitionId, partitionName);
-            // do not use transaction id generator, or the id maybe duplicated
-            long jobId = Catalog.getCurrentCatalog().getNextId();
-            String jobLabel = "delete_" + UUID.randomUUID();
-            // the version info in delete info will be updated after job finished
-            DeleteInfo deleteInfo = new DeleteInfo(db.getId(), tableId, tableName,
-                                                   partition.getId(), partitionName,
-                                                   -1, 0, deleteConditions);
-            loadDeleteJob = new LoadJob(jobId, db.getId(), tableId,
-                                        partitionId, jobLabel, olapTable.getIndexIdToSchemaHash(), conditions, deleteInfo);
-            Map<Long, TabletLoadInfo> idToTabletLoadInfo = Maps.newHashMap();
-            for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                for (Tablet tablet : materializedIndex.getTablets()) {
-                    long tabletId = tablet.getId();
-                    // tabletLoadInfo is empty, because delete load does not need filepath filesize info
-                    TabletLoadInfo tabletLoadInfo = new TabletLoadInfo("", -1);
-                    idToTabletLoadInfo.put(tabletId, tabletLoadInfo);
-                }
-            }
-            loadDeleteJob.setIdToTabletLoadInfo(idToTabletLoadInfo);
-            loadDeleteJob.setState(JobState.LOADING);
-            long transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
-                    Lists.newArrayList(table.getId()), jobLabel,
-                    new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
-                    LoadJobSourceType.FRONTEND,
-                    Config.stream_load_default_timeout_second);
-            loadDeleteJob.setTransactionId(transactionId);
-            // the delete job will be persist in editLog
-            addLoadJob(loadDeleteJob, db);
-        } catch (Throwable t) {
-            LOG.warn("error occurred during prepare delete", t);
-            throw new DdlException(t.getMessage(), t);
-        } finally {
-            if (addRunningPartition) {
-                writeLock();
-                try {
-                    partitionUnderDelete.remove(partitionId);
-                } finally {
-                    writeUnlock();
-                }
-            }
-            table.readUnlock();
-        }
-
-        try {
-            // TODO  wait loadDeleteJob to finished, using while true? or condition wait
-            long startDeleteTime = System.currentTimeMillis();
-            long timeout = loadDeleteJob.getDeleteJobTimeout();
-            while (true) {
-                table.writeLock();
-                try {
-                    if (loadDeleteJob.getState() == JobState.FINISHED
-                            || loadDeleteJob.getState() == JobState.CANCELLED) {
-                        break;
-                    }
-                    if (System.currentTimeMillis() - startDeleteTime > timeout) {
-                        TransactionState transactionState = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(loadDeleteJob.getDbId(),
-                                loadDeleteJob.getTransactionId());
-                        if (transactionState.getTransactionStatus() == TransactionStatus.PREPARE) {
-                            boolean isSuccess = cancelLoadJob(loadDeleteJob, CancelType.TIMEOUT, "load delete job timeout");
-                            if (isSuccess) {
-                                throw new DdlException("timeout when waiting delete");
-                            }
-                        }
-                    }
-                } finally {
-                    table.writeUnlock();
-                }
-                Thread.sleep(1000);
-            }
-        } catch (Exception e) {
-            String failMsg = "delete unknown, " + e.getMessage();
-            LOG.warn(failMsg, e);
-            throw new DdlException(failMsg);
-        } finally {
-            writeLock();
-            try {
-                partitionUnderDelete.remove(partitionId);
-            } finally {
-                writeUnlock();
-            }
-        }
-    }
-
-    public List<List<Comparable>> getAsyncDeleteJobInfo(long jobId) {
-        LinkedList<List<Comparable>> infos = new LinkedList<List<Comparable>>();
-        readLock();
-        try {
-            LoadJob job = null;
-            for (long dbId : dbToDeleteJobs.keySet()) {
-                List<LoadJob> loadJobs = dbToDeleteJobs.get(dbId);
-                for (LoadJob loadJob : loadJobs) {
-                    if (loadJob.getId() == jobId) {
-                        job = loadJob;
-                        break;
-                    }
-                }
-            }
-            if (job == null) {
-                return infos;
-            }
-
-            for (Long tabletId : job.getIdToTabletLoadInfo().keySet()) {
-                List<Comparable> info = Lists.newArrayList();
-                info.add(tabletId);
-                infos.add(info);
-            }
-        } finally {
-            readUnlock();
-        }
-
-        return infos;
-    }
-
-    public long getDeleteJobNumByState(long dbId, JobState state) {
-        readLock();
-        try {
-            List<LoadJob> deleteJobs = dbToDeleteJobs.get(dbId);
-            if (deleteJobs == null) {
-                return 0;
-            } else {
-                int deleteJobNum = 0;
-                for (LoadJob job : deleteJobs) {
-                    if (job.getState() == state) {
-                        ++deleteJobNum;
-                    }
-                }
-                return deleteJobNum;
-            }
-        } finally {
-            readUnlock();
-        }
-    }
-
-    public int getDeleteInfoNum(long dbId) {
-        readLock();
-        try {
-            List<LoadJob> deleteJobs = dbToDeleteJobs.get(dbId);
-            if (deleteJobs == null) {
-                return 0;
-            } else {
-                return deleteJobs.size();
-            }
-        } finally {
-            readUnlock();
-        }
-    }
-
-    public List<List<Comparable>> getDeleteInfosByDb(long dbId, boolean forUser) {
-        LinkedList<List<Comparable>> infos = new LinkedList<List<Comparable>>();
-        Database db = Catalog.getCurrentCatalog().getDb(dbId);
-        if (db == null) {
-            return infos;
-        }
-
-        String dbName = db.getFullName();
-        readLock();
-        try {
-            List<LoadJob> deleteJobs = dbToDeleteJobs.get(dbId);
-            if (deleteJobs == null) {
-                return infos;
-            }
-
-            for (LoadJob loadJob : deleteJobs) {
-
-                DeleteInfo deleteInfo = loadJob.getDeleteInfo();
-
-                if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName,
-                                                                        deleteInfo.getTableName(),
-                                                                        PrivPredicate.LOAD)) {
-                    continue;
-                }
-
-
-                List<Comparable> info = Lists.newArrayList();
-                if (!forUser) {
-                    // do not get job id from delete info, because async delete job == null
-                    // just get it from load job
-                    info.add(loadJob.getId());
-                    info.add(deleteInfo.getTableId());
-                }
-                info.add(deleteInfo.getTableName());
-                if (!forUser) {
-                    info.add(deleteInfo.getPartitionId());
-                }
-                info.add(deleteInfo.getPartitionName());
-
-                info.add(TimeUtils.longToTimeString(deleteInfo.getCreateTimeMs()));
-                String conds = Joiner.on(", ").join(deleteInfo.getDeleteConditions());
-                info.add(conds);
-
-                if (!forUser) {
-                    info.add(deleteInfo.getPartitionVersion());
-                    info.add(deleteInfo.getPartitionVersionHash());
-                }
-                // for loading state, should not display loading, show deleting instead
-                if (loadJob.getState() == JobState.LOADING) {
-                    info.add("DELETING");
-                } else {
-                    info.add(loadJob.getState().name());
-                }
-                infos.add(info);
-            }
-
-        } finally {
-            readUnlock();
-        }
-
-        // sort by createTimeMs
-        int sortIndex;
-        if (!forUser) {
-            sortIndex = 4;
-        } else {
-            sortIndex = 2;
-        }
-        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(sortIndex);
-        Collections.sort(infos, comparator);
-        return infos;
-    }
-
-    public void removeOldDeleteJobs() {
-        long currentTimeMs = System.currentTimeMillis();
-
-        writeLock();
-        try {
-            Iterator<Map.Entry<Long, List<DeleteInfo>>> iter1 = dbToDeleteInfos.entrySet().iterator();
-            while (iter1.hasNext()) {
-                Map.Entry<Long, List<DeleteInfo>> entry = iter1.next();
-                Iterator<DeleteInfo> iter2 = entry.getValue().iterator();
-                while (iter2.hasNext()) {
-                    DeleteInfo deleteInfo = iter2.next();
-                    if ((currentTimeMs - deleteInfo.getCreateTimeMs()) / 1000 > Config.label_keep_max_second) {
-                        iter2.remove();
-                    }
-                }
-
-                if (entry.getValue().isEmpty()) {
-                    iter1.remove();
-                }
-            }
-        } finally {
-            writeUnlock();
-        }
-    }
-
-    public void removeDbDeleteJob(long dbId) {
-        writeLock();
-        try {
-            dbToDeleteInfos.remove(dbId);
-        } finally {
-            writeUnlock();
-        }
-    }
-
-    public LoadJob getLastFinishedLoadJob(long dbId) {
-        LoadJob job = null;
-        readLock();
-        try {
-            long maxTime = Long.MIN_VALUE;
-            List<LoadJob> jobs = dbToLoadJobs.get(dbId);
-            if (jobs != null) {
-                for (LoadJob loadJob : jobs) {
-                    if (loadJob.getState() != JobState.QUORUM_FINISHED && loadJob.getState() != JobState.FINISHED) {
-                        continue;
-                    }
-                    if (loadJob.getLoadFinishTimeMs() > maxTime) {
-                        maxTime = loadJob.getLoadFinishTimeMs();
-                        job = loadJob;
-                    }
-                }
-            }
-        } finally {
-            readUnlock();
-        }
-
-        return job;
-    }
-
-    public DeleteInfo getLastFinishedDeleteInfo(long dbId) {
-        DeleteInfo deleteInfo = null;
-        readLock();
-        try {
-            long maxTime = Long.MIN_VALUE;
-            List<LoadJob> deleteJobs = dbToDeleteJobs.get(dbId);
-            if (deleteJobs != null) {
-                for (LoadJob loadJob : deleteJobs) {
-                    if (loadJob.getDeleteInfo().getCreateTimeMs() > maxTime
-                            && loadJob.getState() == JobState.FINISHED) {
-                        maxTime = loadJob.getDeleteInfo().getCreateTimeMs();
-                        deleteInfo = loadJob.getDeleteInfo();
-                    }
-                }
-            }
-        } finally {
-            readUnlock();
-        }
-        return deleteInfo;
-    }
-
-    public Integer getLoadJobNumByTypeAndState(EtlJobType type, JobState state) {
-        int num = 0;
-        readLock();
-        try {
-            Map<Long, LoadJob> jobMap = null;
-            if (state == null || state == JobState.CANCELLED || state == JobState.FINISHED) {
-                jobMap = idToLoadJob;
-            } else {
-                switch (state) {
-                    case PENDING:
-                        jobMap = idToPendingLoadJob;
-                        break;
-                    case ETL:
-                        jobMap = idToEtlLoadJob;
-                        break;
-                    case LOADING:
-                        jobMap = idToLoadingLoadJob;
-                        break;
-                    case QUORUM_FINISHED:
-                        jobMap = idToQuorumFinishedLoadJob;
-                        break;
-                    default:
-                        break;
-                }
-            }
-            Preconditions.checkNotNull(jobMap);
-
-            for (LoadJob job : jobMap.values()) {
-                if (job.getEtlJobType() == type) {
-                    if (state != null && job.getState() != state) {
-                        continue;
-                    }
-                    ++num;
-                }
-            }
-
-        } finally {
-            readUnlock();
-        }
-        return num;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
index b789ceb..2cbcd10 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -32,7 +32,6 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.MetaLockUtils;
-import org.apache.doris.load.AsyncDeleteJob.DeleteState;
 import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.LoadJob.JobState;
 import org.apache.doris.task.AgentBatchTask;
@@ -546,18 +545,6 @@ public class LoadChecker extends MasterDaemon {
                 LOG.warn("run quorum job error", e);
             }
         }
-
-        // handle async delete job
-        List<AsyncDeleteJob> quorumFinishedDeleteJobs =
-                Catalog.getCurrentCatalog().getLoadInstance().getQuorumFinishedDeleteJobs();
-        for (AsyncDeleteJob job : quorumFinishedDeleteJobs) {
-            try {
-                LOG.info("run quorum finished delete job. job: {}", job.getJobId());
-                runOneQuorumFinishedDeleteJob(job);
-            } catch (Exception e) {
-                LOG.warn("run quorum delete job error", e);
-            }
-        }
     }
     
     private void runOneQuorumFinishedJob(LoadJob job) {
@@ -574,24 +561,6 @@ public class LoadChecker extends MasterDaemon {
             load.clearJob(job, JobState.QUORUM_FINISHED);
         }
     }
-    
-    private void runOneQuorumFinishedDeleteJob(AsyncDeleteJob job) {
-        Load load = Catalog.getCurrentCatalog().getLoadInstance();
-        long dbId = job.getDbId();
-        Database db = Catalog.getCurrentCatalog().getDb(dbId);
-        if (db == null) {
-            load.removeDeleteJobAndSetState(job);
-            return;
-        }
-
-        // if the delete job is quorum finished, just set it to finished
-        job.clearTasks();
-        job.setState(DeleteState.FINISHED);
-        // log
-        Catalog.getCurrentCatalog().getEditLog().logFinishAsyncDelete(job);
-        load.removeDeleteJobAndSetState(job);
-        LOG.info("delete job {} finished", job.getJobId());
-    }
 
     public static boolean checkTimeout(LoadJob job) {
         int timeoutSecond = job.getTimeoutSecond();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
index 5d0f26b..89a81dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
@@ -982,8 +982,7 @@ public class LoadJob implements Writable {
                 }
             }
             if (in.readBoolean()) {
-                this.deleteInfo = new DeleteInfo();
-                this.deleteInfo.readFields(in);
+                this.deleteInfo = DeleteInfo.read(in);
             }
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/TabletDeleteInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/TabletDeleteInfo.java
index 018ac8d..951f025 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/TabletDeleteInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/TabletDeleteInfo.java
@@ -17,20 +17,27 @@
 
 package org.apache.doris.load;
 
-import com.google.common.collect.Sets;
 import org.apache.doris.catalog.Replica;
 
+import com.google.common.collect.Sets;
+
 import java.util.Set;
 
 public class TabletDeleteInfo {
+    private long partitionId;
     private long tabletId;
     private Set<Replica> finishedReplicas;
 
-    public TabletDeleteInfo(long tabletId) {
+    public TabletDeleteInfo(long partitionId, long tabletId) {
+        this.partitionId = partitionId;
         this.tabletId = tabletId;
         this.finishedReplicas = Sets.newConcurrentHashSet();
     }
 
+    public long getPartitionId() {
+        return partitionId;
+    }
+
     public long getTabletId() {
         return tabletId;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 728089d..3a8d131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -35,7 +35,6 @@ import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.load.AsyncDeleteJob;
 import org.apache.doris.load.DeleteJob;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.loadv2.SparkLoadJob;
@@ -394,7 +393,7 @@ public class MasterImpl {
                     Replica replica = findRelatedReplica(olapTable, partition,
                             backendId, tabletId, tabletMeta.getIndexId());
                     if (replica != null) {
-                        deleteJob.addFinishedReplica(pushTabletId, replica);
+                        deleteJob.addFinishedReplica(partitionId, pushTabletId, replica);
                         pushTask.countDownLatch(backendId, pushTabletId);
                     }
                 }
@@ -603,22 +602,6 @@ public class MasterImpl {
                     throw new MetaNotFoundException("delete task is not match. [" + pushTask.getVersion() + "-"
                             + request.getRequestVersion() + "]");
                 }
-
-                if (pushTask.isSyncDelete()) {
-                    pushTask.countDownLatch(backendId, signature);
-                } else {
-                    long asyncDeleteJobId = pushTask.getAsyncDeleteJobId();
-                    Preconditions.checkState(asyncDeleteJobId != -1);
-                    AsyncDeleteJob job = Catalog.getCurrentCatalog().getLoadInstance().getAsyncDeleteJob(asyncDeleteJobId);
-                    if (job == null) {
-                        throw new MetaNotFoundException("cannot find async delete job, job[" + asyncDeleteJobId + "]");
-                    }
-
-                    Preconditions.checkState(!infos.isEmpty());
-                    for (ReplicaPersistInfo info : infos) {
-                        job.addReplicaPersistInfos(info);
-                    }
-                }
             }
 
             AgentTaskQueue.removePushTask(backendId, signature, finishVersion, finishVersionHash,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index b31f99b..5f7e194 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -45,7 +45,6 @@ import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.BDBJEJournal;
 import org.apache.doris.journal.bdbje.Timestamp;
-import org.apache.doris.load.AsyncDeleteJob;
 import org.apache.doris.load.DeleteHandler;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportJob;
@@ -393,24 +392,12 @@ public class EditLog {
                     ExportMgr exportMgr = catalog.getExportMgr();
                     exportMgr.replayUpdateJobState(op.getJobId(), op.getState());
                     break;
-                case OperationType.OP_FINISH_SYNC_DELETE: {
-                    DeleteInfo info = (DeleteInfo) journal.getData();
-                    Load load = catalog.getLoadInstance();
-                    load.replayDelete(info, catalog);
-                    break;
-                }
                 case OperationType.OP_FINISH_DELETE: {
                     DeleteInfo info = (DeleteInfo) journal.getData();
                     DeleteHandler deleteHandler = catalog.getDeleteHandler();
                     deleteHandler.replayDelete(info, catalog);
                     break;
                 }
-                case OperationType.OP_FINISH_ASYNC_DELETE: {
-                    AsyncDeleteJob deleteJob = (AsyncDeleteJob) journal.getData();
-                    Load load = catalog.getLoadInstance();
-                    load.replayFinishAsyncDeleteJob(deleteJob, catalog);
-                    break;
-                }
                 case OperationType.OP_ADD_REPLICA: {
                     ReplicaPersistInfo info = (ReplicaPersistInfo) journal.getData();
                     catalog.replayAddReplica(info);
@@ -1059,18 +1046,10 @@ public class EditLog {
         logEdit(OperationType.OP_REMOVE_FRONTEND, fe);
     }
 
-    public void logFinishSyncDelete(DeleteInfo info) {
-        logEdit(OperationType.OP_FINISH_SYNC_DELETE, info);
-    }
-
     public void logFinishDelete(DeleteInfo info) {
         logEdit(OperationType.OP_FINISH_DELETE, info);
     }
 
-    public void logFinishAsyncDelete(AsyncDeleteJob job) {
-        logEdit(OperationType.OP_FINISH_ASYNC_DELETE, job);
-    }
-
     public void logAddReplica(ReplicaPersistInfo info) {
         logEdit(OperationType.OP_ADD_REPLICA, info);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 53bb587..08baea3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -75,10 +75,12 @@ public class OperationType {
     public static final short OP_EXPORT_CREATE = 36;
     public static final short OP_EXPORT_UPDATE_STATE = 37;
 
+    @Deprecated
     public static final short OP_FINISH_SYNC_DELETE = 40;
     public static final short OP_FINISH_DELETE = 41;
     public static final short OP_ADD_REPLICA = 42;
     public static final short OP_DELETE_REPLICA = 43;
+    @Deprecated
     public static final short OP_FINISH_ASYNC_DELETE = 44;
     public static final short OP_UPDATE_REPLICA = 45;
     public static final short OP_BACKEND_TABLETS_INFO = 46;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index f0b9b8f..2934923 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -48,14 +48,13 @@ import org.apache.doris.proto.PQueryStatistics;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
-import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TUniqueId;
 
-import com.google.common.base.Strings;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.google.common.base.Strings;
+
 import java.io.IOException;
 import java.io.StringReader;
 import java.io.UnsupportedEncodingException;
@@ -409,47 +408,42 @@ public class ConnectProcessor {
         if (request.isSetUserIp()) {
             ctx.setRemoteIP(request.getUserIp());
         }
-        if (request.isSetTimeZone()) {
-            ctx.getSessionVariable().setTimeZone(request.getTimeZone());
-        }
         if (request.isSetStmtId()) {
             ctx.setForwardedStmtId(request.getStmtId());
         }
-        if (request.isSetSqlMode()) {
-            ctx.getSessionVariable().setSqlMode(request.sqlMode);
-        }
-        if (request.isSetEnableStrictMode()) {
-            ctx.getSessionVariable().setEnableInsertStrict(request.enableStrictMode);
-        }
         if (request.isSetCurrentUserIdent()) {
             UserIdentity currentUserIdentity = UserIdentity.fromThrift(request.getCurrentUserIdent());
             ctx.setCurrentUserIdentity(currentUserIdentity);
         }
 
-        if (request.isSetInsertVisibleTimeoutMs()) {
-            ctx.getSessionVariable().setInsertVisibleTimeoutMs(request.getInsertVisibleTimeoutMs());
-        }
-
-        if (request.isSetQueryOptions()) {
-            TQueryOptions queryOptions = request.getQueryOptions();
-            if (queryOptions.isSetMemLimit()) {
-                ctx.getSessionVariable().setMaxExecMemByte(queryOptions.getMemLimit());
+        if (request.isSetSessionVariables()) {
+            ctx.getSessionVariable().setForwardedSessionVariables(request.getSessionVariables());
+        } else {
+            // For compatibility, all following variables are moved to SessionVariables.
+            // Should move in future.
+            if (request.isSetTimeZone()) {
+                ctx.getSessionVariable().setTimeZone(request.getTimeZone());
             }
-            if (queryOptions.isSetQueryTimeout()) {
-                ctx.getSessionVariable().setQueryTimeoutS(queryOptions.getQueryTimeout());
+            if (request.isSetSqlMode()) {
+                ctx.getSessionVariable().setSqlMode(request.sqlMode);
             }
-            if (queryOptions.isSetLoadMemLimit()) {
-                ctx.getSessionVariable().setLoadMemLimit(queryOptions.getLoadMemLimit());
+            if (request.isSetEnableStrictMode()) {
+                ctx.getSessionVariable().setEnableInsertStrict(request.enableStrictMode);
             }
-            if (queryOptions.isSetMaxScanKeyNum()) {
-                ctx.getSessionVariable().setMaxScanKeyNum(queryOptions.getMaxScanKeyNum());
+            if (request.isSetCurrentUserIdent()) {
+                UserIdentity currentUserIdentity = UserIdentity.fromThrift(request.getCurrentUserIdent());
+                ctx.setCurrentUserIdentity(currentUserIdentity);
             }
-            if (queryOptions.isSetMaxPushdownConditionsPerColumn()) {
-                ctx.getSessionVariable().setMaxPushdownConditionsPerColumn(
-                        queryOptions.getMaxPushdownConditionsPerColumn());
+            if (request.isSetInsertVisibleTimeoutMs()) {
+                ctx.getSessionVariable().setInsertVisibleTimeoutMs(request.getInsertVisibleTimeoutMs());
             }
+        }
+
+        if (request.isSetQueryOptions()) {
+            ctx.getSessionVariable().setForwardedSessionVariables(request.getQueryOptions());
         } else {
-            // for compatibility, all following variables are moved to TQueryOptions.
+            // For compatibility, all following variables are moved to TQueryOptions.
+            // Should move in future.
             if (request.isSetExecMemLimit()) {
                 ctx.getSessionVariable().setMaxExecMemByte(request.getExecMemLimit());
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index fa62728..36b86f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -23,7 +23,6 @@ import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TUniqueId;
 
 import org.apache.logging.log4j.LogManager;
@@ -86,20 +85,16 @@ public class MasterOpExecutor {
         params.setStmtIdx(originStmt.idx);
         params.setUser(ctx.getQualifiedUser());
         params.setDb(ctx.getDatabase());
-        params.setSqlMode(ctx.getSessionVariable().getSqlMode());
         params.setResourceInfo(ctx.toResourceCtx());
         params.setUserIp(ctx.getRemoteIP());
-        params.setTimeZone(ctx.getSessionVariable().getTimeZone());
         params.setStmtId(ctx.getStmtId());
-        params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict());
         params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
-        params.setInsertVisibleTimeoutMs(ctx.getSessionVariable().getInsertVisibleTimeoutMs());
 
-        TQueryOptions queryOptions = new TQueryOptions();
-        queryOptions.setMemLimit(ctx.getSessionVariable().getMaxExecMemByte());
-        queryOptions.setQueryTimeout(ctx.getSessionVariable().getQueryTimeoutS());
-        queryOptions.setLoadMemLimit(ctx.getSessionVariable().getLoadMemLimit());
-        params.setQueryOptions(queryOptions);
+        // query options
+        params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
+        // session variables
+        params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
+
         if (null != ctx.queryId()) {
             params.setQueryId(ctx.queryId());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 885c543..3573b7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -34,11 +34,13 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
 
 // System variable
 public class SessionVariable implements Serializable, Writable {
-    
     static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
+
     public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
     public static final String QUERY_TIMEOUT = "query_timeout";
     public static final String IS_REPORT_SUCCESS = "is_report_success";
@@ -67,7 +69,7 @@ public class SessionVariable implements Serializable, Writable {
     public static final String NET_BUFFER_LENGTH = "net_buffer_length";
     public static final String CODEGEN_LEVEL = "codegen_level";
     // mem limit can't smaller than bufferpool's default page size
-    public static final int MIN_EXEC_MEM_LIMIT = 2097152;   
+    public static final int MIN_EXEC_MEM_LIMIT = 2097152;
     public static final String BATCH_SIZE = "batch_size";
     public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
     public static final String DISABLE_COLOCATE_JOIN = "disable_colocate_join";
@@ -89,7 +91,7 @@ public class SessionVariable implements Serializable, Writable {
     public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num";
     public static final String SHOW_HIDDEN_COLUMNS = "show_hidden_columns";
     /*
-     * configure the mem limit of load process on BE. 
+     * configure the mem limit of load process on BE.
      * Previously users used exec_mem_limit to set memory limits.
      * To maintain compatibility, the default value of load_mem_limit is 0,
      * which means that the load memory limit is still using exec_mem_limit.
@@ -114,11 +116,15 @@ public class SessionVariable implements Serializable, Writable {
 
     // max ms to wait transaction publish finish when exec insert stmt.
     public static final String INSERT_VISIBLE_TIMEOUT_MS = "insert_visible_timeout_ms";
+
+    public static final String DELETE_WITHOUT_PARTITION = "delete_without_partition";
+
+
     public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000;
     public static final long MIN_INSERT_VISIBLE_TIMEOUT_MS = 1000; // If user set a very small value, use this value instead.
 
-    @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS)
-    private long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
+    @VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS, needForward = true)
+    public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
 
     // max memory used on every backend.
     @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
@@ -129,164 +135,167 @@ public class SessionVariable implements Serializable, Writable {
 
     // query timeout in second.
     @VariableMgr.VarAttr(name = QUERY_TIMEOUT)
-    private int queryTimeoutS = 300;
+    public int queryTimeoutS = 300;
 
     // if true, need report to coordinator when plan fragment execute successfully.
-    @VariableMgr.VarAttr(name = IS_REPORT_SUCCESS)
-    private boolean isReportSucc = false;
+    @VariableMgr.VarAttr(name = IS_REPORT_SUCCESS, needForward = true)
+    public boolean isReportSucc = false;
 
     // Set sqlMode to empty string
-    @VariableMgr.VarAttr(name = SQL_MODE)
-    private long sqlMode = 0L;
+    @VariableMgr.VarAttr(name = SQL_MODE, needForward = true)
+    public long sqlMode = 0L;
 
     @VariableMgr.VarAttr(name = RESOURCE_VARIABLE)
-    private String resourceGroup = "normal";
+    public String resourceGroup = "normal";
 
     // this is used to make mysql client happy
     @VariableMgr.VarAttr(name = AUTO_COMMIT)
-    private boolean autoCommit = true;
+    public boolean autoCommit = true;
 
     // this is used to make c3p0 library happy
     @VariableMgr.VarAttr(name = TX_ISOLATION)
-    private String txIsolation = "REPEATABLE-READ";
+    public String txIsolation = "REPEATABLE-READ";
 
     // this is used to make c3p0 library happy
     @VariableMgr.VarAttr(name = CHARACTER_SET_CLIENT)
-    private String charsetClient = "utf8";
+    public String charsetClient = "utf8";
     @VariableMgr.VarAttr(name = CHARACTER_SET_CONNNECTION)
-    private String charsetConnection = "utf8";
+    public String charsetConnection = "utf8";
     @VariableMgr.VarAttr(name = CHARACTER_SET_RESULTS)
-    private String charsetResults = "utf8";
+    public String charsetResults = "utf8";
     @VariableMgr.VarAttr(name = CHARACTER_SET_SERVER)
-    private String charsetServer = "utf8";
+    public String charsetServer = "utf8";
     @VariableMgr.VarAttr(name = COLLATION_CONNECTION)
-    private String collationConnection = "utf8_general_ci";
+    public String collationConnection = "utf8_general_ci";
     @VariableMgr.VarAttr(name = COLLATION_DATABASE)
-    private String collationDatabase = "utf8_general_ci";
+    public String collationDatabase = "utf8_general_ci";
 
     @VariableMgr.VarAttr(name = COLLATION_SERVER)
-    private String collationServer = "utf8_general_ci";
+    public String collationServer = "utf8_general_ci";
 
     // this is used to make c3p0 library happy
     @VariableMgr.VarAttr(name = SQL_AUTO_IS_NULL)
-    private boolean sqlAutoIsNull = false;
+    public boolean sqlAutoIsNull = false;
 
     @VariableMgr.VarAttr(name = SQL_SELECT_LIMIT)
-    private long sqlSelectLimit = 9223372036854775807L;
+    public long sqlSelectLimit = 9223372036854775807L;
 
     // this is used to make c3p0 library happy
     @VariableMgr.VarAttr(name = MAX_ALLOWED_PACKET)
-    private int maxAllowedPacket = 1048576;
+    public int maxAllowedPacket = 1048576;
 
     @VariableMgr.VarAttr(name = AUTO_INCREMENT_INCREMENT)
-    private int autoIncrementIncrement = 1;
+    public int autoIncrementIncrement = 1;
 
     // this is used to make c3p0 library happy
     @VariableMgr.VarAttr(name = QUERY_CACHE_TYPE)
-    private int queryCacheType = 0;
+    public int queryCacheType = 0;
 
     // The number of seconds the server waits for activity on an interactive connection before closing it
     @VariableMgr.VarAttr(name = INTERACTIVE_TIMTOUT)
-    private int interactiveTimeout = 3600;
+    public int interactiveTimeout = 3600;
 
     // The number of seconds the server waits for activity on a noninteractive connection before closing it.
     @VariableMgr.VarAttr(name = WAIT_TIMEOUT)
-    private int waitTimeout = 28800;
+    public int waitTimeout = 28800;
 
     // The number of seconds to wait for a block to be written to a connection before aborting the write
     @VariableMgr.VarAttr(name = NET_WRITE_TIMEOUT)
-    private int netWriteTimeout = 60;
+    public int netWriteTimeout = 60;
 
     // The number of seconds to wait for a block to be written to a connection before aborting the write
     @VariableMgr.VarAttr(name = NET_READ_TIMEOUT)
-    private int netReadTimeout = 60;
+    public int netReadTimeout = 60;
 
     // The current time zone
-    @VariableMgr.VarAttr(name = TIME_ZONE)
-    private String timeZone = TimeUtils.getSystemTimeZone().getID();
+    @VariableMgr.VarAttr(name = TIME_ZONE, needForward = true)
+    public String timeZone = TimeUtils.getSystemTimeZone().getID();
 
     @VariableMgr.VarAttr(name = PARALLEL_EXCHANGE_INSTANCE_NUM)
-    private int exchangeInstanceParallel = -1;
+    public int exchangeInstanceParallel = -1;
 
     @VariableMgr.VarAttr(name = SQL_SAFE_UPDATES)
-    private int sqlSafeUpdates = 0;
+    public int sqlSafeUpdates = 0;
 
     // only
     @VariableMgr.VarAttr(name = NET_BUFFER_LENGTH, flag = VariableMgr.READ_ONLY)
-    private int netBufferLength = 16384;
+    public int netBufferLength = 16384;
 
     // if true, need report to coordinator when plan fragment execute successfully.
     @VariableMgr.VarAttr(name = CODEGEN_LEVEL)
-    private int codegenLevel = 0;
+    public int codegenLevel = 0;
 
     @VariableMgr.VarAttr(name = BATCH_SIZE)
-    private int batchSize = 1024;
+    public int batchSize = 1024;
 
     @VariableMgr.VarAttr(name = DISABLE_STREAMING_PREAGGREGATIONS)
-    private boolean disableStreamPreaggregations = false;
+    public boolean disableStreamPreaggregations = false;
 
     @VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
-    private boolean disableColocateJoin = false;
+    public boolean disableColocateJoin = false;
 
     @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN)
-    private boolean enableBucketShuffleJoin = false;
+    public boolean enableBucketShuffleJoin = false;
 
     @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
-    private String preferJoinMethod = "broadcast";
+    public String preferJoinMethod = "broadcast";
 
     /*
      * the parallel exec instance num for one Fragment in one BE
      * 1 means disable this feature
      */
     @VariableMgr.VarAttr(name = PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)
-    private int parallelExecInstanceNum = 1;
+    public int parallelExecInstanceNum = 1;
 
-    @VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT)
-    private boolean enableInsertStrict = false;
+    @VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT, needForward = true)
+    public boolean enableInsertStrict = false;
 
     @VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION)
-    private boolean enableOdbcTransaction = false;
+    public boolean enableOdbcTransaction = false;
 
     @VariableMgr.VarAttr(name = ENABLE_SQL_CACHE)
-    private boolean enableSqlCache = false;
+    public boolean enableSqlCache = false;
 
     @VariableMgr.VarAttr(name = ENABLE_PARTITION_CACHE)
-    private boolean enablePartitionCache = false;
+    public boolean enablePartitionCache = false;
 
     @VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
-    private boolean forwardToMaster = false;
+    public boolean forwardToMaster = false;
 
     @VariableMgr.VarAttr(name = LOAD_MEM_LIMIT)
-    private long loadMemLimit = 0L;
+    public long loadMemLimit = 0L;
 
     @VariableMgr.VarAttr(name = USE_V2_ROLLUP)
-    private boolean useV2Rollup = false;
+    public boolean useV2Rollup = false;
 
     // TODO(ml): remove it after test
     @VariableMgr.VarAttr(name = TEST_MATERIALIZED_VIEW)
-    private boolean testMaterializedView = false;
+    public boolean testMaterializedView = false;
 
     @VariableMgr.VarAttr(name = REWRITE_COUNT_DISTINCT_TO_BITMAP_HLL)
-    private boolean rewriteCountDistinct = true;
+    public boolean rewriteCountDistinct = true;
 
     // compatible with some mysql client connect, say DataGrip of JetBrains
     @VariableMgr.VarAttr(name = EVENT_SCHEDULER)
-    private String eventScheduler = "OFF";
+    public String eventScheduler = "OFF";
     @VariableMgr.VarAttr(name = STORAGE_ENGINE)
-    private String storageEngine = "olap";
+    public String storageEngine = "olap";
     @VariableMgr.VarAttr(name = DIV_PRECISION_INCREMENT)
-    private int divPrecisionIncrement = 4;
+    public int divPrecisionIncrement = 4;
 
     // -1 means unset, BE will use its config value
     @VariableMgr.VarAttr(name = MAX_SCAN_KEY_NUM)
-    private int maxScanKeyNum = -1;
+    public int maxScanKeyNum = -1;
     @VariableMgr.VarAttr(name = MAX_PUSHDOWN_CONDITIONS_PER_COLUMN)
-    private int maxPushdownConditionsPerColumn = -1;
+    public int maxPushdownConditionsPerColumn = -1;
     @VariableMgr.VarAttr(name = SHOW_HIDDEN_COLUMNS, flag = VariableMgr.SESSION_ONLY)
-    private boolean showHiddenColumns = false;
+    public boolean showHiddenColumns = false;
 
     @VariableMgr.VarAttr(name = ALLOW_PARTITION_COLUMN_NULLABLE)
-    private boolean allowPartitionColumnNullable = true;
+    public boolean allowPartitionColumnNullable = true;
+
+    @VariableMgr.VarAttr(name = DELETE_WITHOUT_PARTITION, needForward = true)
+    public boolean deleteWithoutPartition = false;
 
     public long getMaxExecMemByte() {
         return maxExecMemByte;
@@ -444,9 +453,13 @@ public class SessionVariable implements Serializable, Writable {
         return enableOdbcTransaction;
     }
 
-    public String getPreferJoinMethod() {return preferJoinMethod; }
+    public String getPreferJoinMethod() {
+        return preferJoinMethod;
+    }
 
-    public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; }
+    public void setPreferJoinMethod(String preferJoinMethod) {
+        this.preferJoinMethod = preferJoinMethod;
+    }
 
     public int getParallelExecInstanceNum() {
         return parallelExecInstanceNum;
@@ -456,7 +469,9 @@ public class SessionVariable implements Serializable, Writable {
         return exchangeInstanceParallel;
     }
 
-    public boolean getEnableInsertStrict() { return enableInsertStrict; }
+    public boolean getEnableInsertStrict() {
+        return enableInsertStrict;
+    }
 
     public void setEnableInsertStrict(boolean enableInsertStrict) {
         this.enableInsertStrict = enableInsertStrict;
@@ -483,7 +498,9 @@ public class SessionVariable implements Serializable, Writable {
         return forwardToMaster;
     }
 
-    public boolean isUseV2Rollup() { return useV2Rollup; }
+    public boolean isUseV2Rollup() {
+        return useV2Rollup;
+    }
 
     // for unit test
     public void setUseV2Rollup(boolean useV2Rollup) {
@@ -550,7 +567,9 @@ public class SessionVariable implements Serializable, Writable {
         this.showHiddenColumns = showHiddenColumns;
     }
 
-    public boolean isAllowPartitionColumnNullable() { return allowPartitionColumnNullable; }
+    public boolean isAllowPartitionColumnNullable() {
+        return allowPartitionColumnNullable;
+    }
 
     public long getInsertVisibleTimeoutMs() {
         if (insertVisibleTimeoutMs < MIN_INSERT_VISIBLE_TIMEOUT_MS) {
@@ -568,6 +587,10 @@ public class SessionVariable implements Serializable, Writable {
         }
     }
 
+    public boolean isDeleteWithoutPartition() {
+        return deleteWithoutPartition;
+    }
+
     // Serialize to thrift object
     // used for rest api
     public TQueryOptions toThrift() {
@@ -734,4 +757,87 @@ public class SessionVariable implements Serializable, Writable {
             throw new IOException("failed to read session variable: " + e.getMessage());
         }
     }
+
+    // Get all variables which need to forward along with statement
+    public Map<String, String> getForwardVariables() {
+        HashMap<String, String> map = new HashMap<String, String>();
+        try {
+            Field[] fields = SessionVariable.class.getDeclaredFields();
+            for (Field f : fields) {
+                VarAttr varAttr = f.getAnnotation(VarAttr.class);
+                if (varAttr == null || !varAttr.needForward()) {
+                    continue;
+                }
+                map.put(varAttr.name(), String.valueOf(f.get(this)));
+            }
+        } catch (IllegalAccessException e) {
+            LOG.error("failed to get forward variables", e);
+        }
+        return map;
+    }
+
+    public void setForwardedSessionVariables(Map<String, String> variables) {
+        try {
+            Field[] fields = SessionVariable.class.getFields();
+            for (Field f : fields) {
+                VarAttr varAttr = f.getAnnotation(VarAttr.class);
+                if (varAttr == null || !varAttr.needForward()) {
+                    continue;
+                }
+                String val = variables.get(varAttr.name());
+                if (val == null) {
+                    continue;
+                }
+
+                LOG.debug("set forward variable: {} = {}", varAttr.name(), val);
+
+                // set config field
+                switch (f.getType().getSimpleName()) {
+                    case "short":
+                        f.setShort(this, Short.parseShort(val));
+                        break;
+                    case "int":
+                        f.setInt(this, Integer.parseInt(val));
+                        break;
+                    case "long":
+                        f.setLong(this, Long.parseLong(val));
+                        break;
+                    case "double":
+                        f.setDouble(this, Double.parseDouble(val));
+                        break;
+                    case "boolean":
+                        f.setBoolean(this, Boolean.parseBoolean(val));
+                        break;
+                    case "String":
+                        f.set(this, val);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unknown field type: " + f.getType().getSimpleName());
+                }
+            }
+        } catch (IllegalAccessException e) {
+            LOG.error("failed to set forward variables", e);
+        }
+    }
+
+    // Get all variables which need to be set in TQueryOptions
+    public TQueryOptions getQueryOptionVariables() {
+        TQueryOptions queryOptions = new TQueryOptions();
+        queryOptions.setMemLimit(maxExecMemByte);
+        queryOptions.setQueryTimeout(queryTimeoutS);
+        queryOptions.setLoadMemLimit(loadMemLimit);
+        return queryOptions;
+    }
+
+    public void setForwardedSessionVariables(TQueryOptions queryOptions) {
+        if (queryOptions.isSetMemLimit()) {
+            setMaxExecMemByte(queryOptions.getMemLimit());
+        }
+        if (queryOptions.isSetQueryTimeout()) {
+            setQueryTimeoutS(queryOptions.getQueryTimeout());
+        }
+        if (queryOptions.isSetLoadMemLimit()) {
+            setLoadMemLimit(queryOptions.getLoadMemLimit());
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 9062a92..7cc08bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -127,15 +127,15 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -1072,8 +1072,7 @@ public class ShowExecutor {
 
         DeleteHandler deleteHandler = catalog.getDeleteHandler();
         Load load = catalog.getLoadInstance();
-        List<List<Comparable>> deleteInfos = deleteHandler.getDeleteInfosByDb(dbId, true);
-        deleteInfos.addAll(load.getDeleteInfosByDb(dbId, true));
+        List<List<Comparable>> deleteInfos = deleteHandler.getDeleteInfosByDb(dbId);
         List<List<String>> rows = Lists.newArrayList();
         for (List<Comparable> deleteInfo : deleteInfos) {
             List<String> oneInfo = new ArrayList<String>(deleteInfo.size());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index e2f3477..000ee1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -31,15 +31,15 @@ import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.GlobalVarPersistInfo;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Lists;
-
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.json.JSONObject;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -510,6 +510,12 @@ public class VariableMgr {
         // TODO(zhaochun): min and max is not used.
         String minValue() default "0";
         String maxValue() default "0";
+
+        // Set to true if the variables need to be forwarded along with forward statement.
+        boolean needForward() default false;
+
+        // Set to true if the variables need to be set in TQueryOptions
+        boolean isQueryOption() default false;
     }
 
     private static class VarContext {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DeleteStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DeleteStmtTest.java
index e89bfcf..9530ca6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DeleteStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DeleteStmtTest.java
@@ -24,16 +24,16 @@ import org.apache.doris.mysql.privilege.MockedAuth;
 import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.collect.Lists;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import mockit.Mocked;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 
+import mockit.Mocked;
+
 public class DeleteStmtTest {
 
     Analyzer analyzer;
@@ -59,9 +59,13 @@ public class DeleteStmtTest {
 
         Assert.assertEquals("testDb", deleteStmt.getDbName());
         Assert.assertEquals("testTbl", deleteStmt.getTableName());
-        Assert.assertEquals("partition", deleteStmt.getPartitionName());
-        Assert.assertEquals("DELETE FROM `testDb`.`testTbl` PARTITION partition WHERE `k1` = 'abc'",
+        Assert.assertEquals(Lists.newArrayList("partition"), deleteStmt.getPartitionNames());
+        Assert.assertEquals("DELETE FROM `testDb`.`testTbl` PARTITION (partition) WHERE `k1` = 'abc'",
                             deleteStmt.toSql());
+
+        deleteStmt = new DeleteStmt(new TableName("testDb", "testTbl"), null, wherePredicate);
+        Assert.assertEquals("DELETE FROM `testDb`.`testTbl` WHERE `k1` = 'abc'",
+                deleteStmt.toSql());
     }
 
     @Test
@@ -175,6 +179,25 @@ public class DeleteStmtTest {
         } catch (UserException e) {
             Assert.fail();
         }
+
+        // multi partition
+        deleteStmt = new DeleteStmt(new TableName("testDb", "testTbl"),
+                new PartitionNames(false, Lists.newArrayList("partition1", "partiton2")), compoundPredicate);
+        try {
+            deleteStmt.analyze(analyzer);
+            Assert.assertEquals(Lists.newArrayList("partition1", "partiton2"), deleteStmt.getPartitionNames());
+        } catch (UserException e) {
+            Assert.fail();
+        }
+
+        // no partition
+        deleteStmt = new DeleteStmt(new TableName("testDb", "testTbl"), null, compoundPredicate);
+        try {
+            deleteStmt.analyze(analyzer);
+            Assert.assertEquals(Lists.newArrayList(), deleteStmt.getPartitionNames());
+        } catch (UserException e) {
+            Assert.fail();
+        }
     }
 
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index 6b1d5e4..8ee75c3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.doris.load;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import mockit.Expectations;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
 import org.apache.doris.analysis.AccessTestUtil;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BinaryPredicate;
@@ -44,6 +38,7 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.DeleteJob.DeleteState;
 import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
@@ -52,22 +47,30 @@ import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.load.DeleteJob.DeleteState;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 import org.apache.doris.transaction.TxnCommitAttachment;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
 public class DeleteHandlerTest {
 
     private DeleteHandler deleteHandler;
@@ -225,7 +228,7 @@ public class DeleteHandlerTest {
         Set<Replica> finishedReplica = Sets.newHashSet();
         finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
-        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(PARTITION_ID, TABLET_ID);
         tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
 
         new MockUp<DeleteJob>() {
@@ -274,7 +277,7 @@ public class DeleteHandlerTest {
         finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
-        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(PARTITION_ID, TABLET_ID);
         tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
 
         new MockUp<DeleteJob>() {
@@ -324,7 +327,7 @@ public class DeleteHandlerTest {
         finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
-        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(PARTITION_ID, TABLET_ID);
         tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
 
         new MockUp<DeleteJob>() {
@@ -386,7 +389,7 @@ public class DeleteHandlerTest {
         finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
-        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(PARTITION_ID, TABLET_ID);
         tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
 
         new MockUp<DeleteJob>() {
@@ -443,7 +446,7 @@ public class DeleteHandlerTest {
         finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
         finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
-        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(PARTITION_ID, TABLET_ID);
         tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
 
         new MockUp<DeleteJob>() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/LoadCheckerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/LoadCheckerTest.java
index 12fb22b..440ac0b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/LoadCheckerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/LoadCheckerTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.load;
 
-import mockit.Expectations;
-import mockit.Mocked;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -36,13 +34,13 @@ import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.MasterTask;
 import org.apache.doris.task.MasterTaskExecutor;
 
-import com.google.common.collect.Lists;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -50,6 +48,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import mockit.Expectations;
+import mockit.Mocked;
+
 public class LoadCheckerTest {
     private long dbId;
     private long tableId;
@@ -334,7 +335,6 @@ public class LoadCheckerTest {
     @Test
     public void testRunQuorumFinishedJobs() throws Exception {
         List<LoadJob> etlJobs = new ArrayList<LoadJob>();
-        List<AsyncDeleteJob> deleteJobs = Lists.newArrayList();
         LoadJob job = new LoadJob(label);
         job.setState(JobState.QUORUM_FINISHED);
         job.setDbId(dbId);
@@ -374,10 +374,6 @@ public class LoadCheckerTest {
                 minTimes = 0;
                 result = etlJobs;
 
-                load.getQuorumFinishedDeleteJobs();
-                minTimes = 0;
-                result = deleteJobs;
-
                 load.updateLoadJobState(job, JobState.FINISHED);
                 minTimes = 0;
                 result = true;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
new file mode 100644
index 0000000..3313630
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.thrift.TQueryOptions;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+public class SessionVariablesTest {
+
+    private static SessionVariable sessionVariable;
+    private static int numOfForwardVars;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        sessionVariable = new SessionVariable();
+
+        Field[] fields = SessionVariable.class.getFields();
+        for (Field f : fields) {
+            VariableMgr.VarAttr varAttr = f.getAnnotation(VariableMgr.VarAttr.class);
+            if (varAttr == null || !varAttr.needForward()) {
+                continue;
+            }
+            numOfForwardVars++;
+        }
+    }
+
+    @Test
+    public void testForwardSessionVariables() {
+        Map<String, String> vars = sessionVariable.getForwardVariables();
+        Assert.assertTrue(numOfForwardVars >= 6);
+        Assert.assertEquals(numOfForwardVars, vars.size());
+
+        vars.put(SessionVariable.IS_REPORT_SUCCESS, "true");
+        sessionVariable.setForwardedSessionVariables(vars);
+        Assert.assertEquals(true, sessionVariable.isReportSucc);
+    }
+
+    @Test
+    public void testForwardQueryOptions() {
+        TQueryOptions queryOptions = sessionVariable.getQueryOptionVariables();
+        Assert.assertTrue(queryOptions.isSetMemLimit());
+        Assert.assertTrue(queryOptions.isSetLoadMemLimit());
+        Assert.assertTrue(queryOptions.isSetQueryTimeout());
+
+        queryOptions.setQueryTimeout(123);
+        sessionVariable.setForwardedSessionVariables(queryOptions);
+        Assert.assertEquals(123, sessionVariable.getQueryTimeoutS());
+    }
+}
\ No newline at end of file
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index df3ddbe..4e6cfd1 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -433,17 +433,18 @@ struct TMasterOpRequest {
     6: optional i64 execMemLimit // deprecated, move into query_options
     7: optional i32 queryTimeout // deprecated, move into query_options
     8: optional string user_ip
-    9: optional string time_zone
+    9: optional string time_zone // deprecated, move into session_variables
     10: optional i64 stmt_id
-    11: optional i64 sqlMode
+    11: optional i64 sqlMode // deprecated, move into session_variables
     12: optional i64 loadMemLimit // deprecated, move into query_options
-    13: optional bool enableStrictMode
+    13: optional bool enableStrictMode // deprecated, move into session_variables
     // this can replace the "user" field
     14: optional Types.TUserIdentity current_user_ident
     15: optional i32 stmtIdx  // the idx of the sql in multi statements
     16: optional PaloInternalService.TQueryOptions query_options
     17: optional Types.TUniqueId query_id // when this is a query, we translate this query id to master
-    18: optional i64 insert_visible_timeout_ms
+    18: optional i64 insert_visible_timeout_ms // deprecated, move into session_variables
+    19: optional map<string, string> session_variables
 }
 
 struct TColumnDefinition {
@@ -715,4 +716,4 @@ service FrontendService {
     Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)
 
     TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org