You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/16 02:33:14 UTC

[incubator-doris] branch master updated: [Optimize][Delete] Simplify the delete process to make it fast (#3191)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b29cb9d  [Optimize][Delete] Simplify the delete process to make it fast (#3191)
b29cb9d is described below

commit b29cb9dbb3d32cf58a20036049420095011597e0
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Thu Apr 16 10:32:44 2020 +0800

    [Optimize][Delete] Simplify the delete process to make it fast (#3191)
    
    Our current DELETE strategy reuses the LoadChecker framework.
    LoadChecker runs jobs in different stages by polling them in every 5 seconds.
    
    There are four stages of a load job, Pending/ETL/Loading/Quorum_finish,
    each of them is allocated to a LoadChecker. Four example, if a load job is submitted,
    it will be initialized to the Pending state, then wait for running by the Pending LoadChecker.
    After the pending job is ran, its stage will change to ETL stage, and then wait for
    running by the next LoadChecker(ETL). Because interval time of the LoadChecker is 5s,
    in worst case, a pending job need to wait for 20s during its life cycle.
    
    In particular, the DELETE jobs do not need to wait for polling, they can run the pushTask()
    function directly to delete. In this commit, I add a delete handler to concurrently
    processing delete tasks.
    
    All delete tasks will push to BE immediately, not required to wait for LoadCheker,
    without waiting for 2 LoadChecker(delete job started in LOADING state),
    at most 10s will be save(5s per LoadCheker). The delete process now is synchronized
    and users get response only after the delete finished or be canceled.
    
    If a delete is running over a certain period of time, it will be cancelled with a timeout exception.
    
    NOTICE: this CL upgrade FE meta version to 82
---
 .../java/org/apache/doris/catalog/Catalog.java     |  21 +
 .../main/java/org/apache/doris/common/Config.java  |   4 +
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../doris/common/proc/DeleteInfoProcDir.java       |  13 +-
 .../org/apache/doris/common/proc/JobsProcDir.java  |   2 +-
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../doris/journal/local/LocalJournalCursor.java    |   6 +
 .../java/org/apache/doris/load/DeleteHandler.java  | 630 +++++++++++++++++++++
 .../java/org/apache/doris/load/DeleteInfo.java     |  10 +
 fe/src/main/java/org/apache/doris/load/Load.java   |   2 +-
 .../org/apache/doris/load/TabletDeleteInfo.java    |  46 ++
 .../java/org/apache/doris/master/MasterImpl.java   |  43 +-
 .../java/org/apache/doris/persist/EditLog.java     |  11 +
 .../org/apache/doris/persist/OperationType.java    |   1 +
 .../java/org/apache/doris/qe/ConnectContext.java   |   4 +
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   4 +-
 .../main/java/org/apache/doris/qe/QueryState.java  |   4 +
 .../org/apache/doris/qe/QueryStateException.java   |  52 ++
 .../java/org/apache/doris/qe/ShowExecutor.java     |   5 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +
 .../main/java/org/apache/doris/task/DeleteJob.java | 188 ++++++
 .../main/java/org/apache/doris/task/PushTask.java  |   4 +
 .../org/apache/doris/load/DeleteHandlerTest.java   | 451 +++++++++++++++
 23 files changed, 1490 insertions(+), 23 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index a280e65..490c202 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -136,6 +136,7 @@ import org.apache.doris.http.meta.MetaBaseAction;
 import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.Timestamp;
+import org.apache.doris.load.DeleteHandler;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportChecker;
 import org.apache.doris.load.ExportJob;
@@ -289,6 +290,7 @@ public class Catalog {
     private ConsistencyChecker consistencyChecker;
     private BackupHandler backupHandler;
     private PublishVersionDaemon publishVersionDaemon;
+    private DeleteHandler deleteHandler;
 
     private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
     private MasterDaemon txnCleaner; // To clean aborted or timeout txns
@@ -459,6 +461,7 @@ public class Catalog {
         this.backupHandler = new BackupHandler(this);
         this.metaDir = Config.meta_dir;
         this.publishVersionDaemon = new PublishVersionDaemon();
+        this.deleteHandler = new DeleteHandler();
 
         this.replayedJournalId = new AtomicLong(0L);
         this.isElectable = false;
@@ -1435,6 +1438,7 @@ public class Catalog {
             checksum = loadLoadJobsV2(dis, checksum);
             checksum = loadSmallFiles(dis, checksum);
             checksum = loadPlugins(dis, checksum);
+            checksum = loadDeleteHandler(dis, checksum);
 
             long remoteChecksum = dis.readLong();
             Preconditions.checkState(remoteChecksum == checksum, remoteChecksum + " vs. " + checksum);
@@ -1765,6 +1769,18 @@ public class Catalog {
         return checksum;
     }
 
+    public long loadDeleteHandler(DataInputStream dis, long checksum) throws IOException {
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_82) {
+            this.deleteHandler = DeleteHandler.read(dis);
+        }
+        return checksum;
+    }
+
+    public long saveDeleteHandler(DataOutputStream dos, long checksum) throws IOException {
+        getDeleteHandler().write(dos);
+        return checksum;
+    }
+
     public long loadPaloAuth(DataInputStream dis, long checksum) throws IOException {
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_43) {
             // CAN NOT use PaloAuth.read(), cause this auth instance is already passed to DomainResolver
@@ -1869,6 +1885,7 @@ public class Catalog {
             checksum = saveLoadJobsV2(dos, checksum);
             checksum = saveSmallFiles(dos, checksum);
             checksum = savePlugins(dos, checksum);
+            checksum = saveDeleteHandler(dos, checksum);
             dos.writeLong(checksum);
         }
 
@@ -4589,6 +4606,10 @@ public class Catalog {
         return this.backupHandler;
     }
 
+    public DeleteHandler getDeleteHandler() {
+        return this.deleteHandler;
+    }
+
     public Load getLoadInstance() {
         return this.load;
     }
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java
index e865399..913cae8 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -398,6 +398,10 @@ public class Config extends ConfigBase {
      */
     @ConfField public static int load_etl_thread_num_normal_priority = 10;
     /*
+     * Concurrency of delete jobs.
+     */
+    @ConfField public static int delete_thread_num = 10;
+    /*
      * Not available.
      */
     @ConfField(mutable = true, masterOnly = true)
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 4e3a0f6..e3672e3 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -173,6 +173,8 @@ public final class FeMetaVersion {
     public static final int VERSION_80 = 80;
     // replica quota support
     public static final int VERSION_81 = 81;
+	// optimize delete job
+	public static final int VERSION_82 = 82;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_81;
+    public static final int VERSION_CURRENT = VERSION_82;
 }
diff --git a/fe/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java
index 7979bb0..bffa6ce 100644
--- a/fe/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java
+++ b/fe/src/main/java/org/apache/doris/common/proc/DeleteInfoProcDir.java
@@ -18,9 +18,10 @@
 package org.apache.doris.common.proc;
 
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.load.Load;
+import org.apache.doris.load.DeleteHandler;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.doris.load.Load;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,10 +40,12 @@ public class DeleteInfoProcDir implements ProcDirInterface {
             .build();
 
     private Load load;
+    private DeleteHandler deleteHandler;
     private long dbId;
 
-    public DeleteInfoProcDir(Load load, long dbId) {
+    public DeleteInfoProcDir(DeleteHandler deleteHandler, Load load, long dbId) {
         this.load = load;
+        this.deleteHandler = deleteHandler;
         this.dbId = dbId;
     }
 
@@ -51,7 +54,8 @@ public class DeleteInfoProcDir implements ProcDirInterface {
         BaseProcResult result = new BaseProcResult();
         result.setNames(TITLE_NAMES);
 
-        List<List<Comparable>> infos = load.getDeleteInfosByDb(dbId, false);
+        List<List<Comparable>> infos = deleteHandler.getDeleteInfosByDb(dbId, false);
+        infos.addAll(load.getDeleteInfosByDb(dbId, false));
         for (List<Comparable> info : infos) {
             List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
             for (Comparable element : info) {
@@ -77,7 +81,8 @@ public class DeleteInfoProcDir implements ProcDirInterface {
             throw new AnalysisException("Invalid job id format: " + jobIdStr);
         }
 
-        return new DeleteJobProcNode(load, jobId);
+        // return new DeleteJobProcNode(load, jobId);
+        return null;
     }
 
 }
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
index 0565621..35af1d3 100644
--- a/fe/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
+++ b/fe/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
@@ -70,7 +70,7 @@ public class JobsProcDir implements ProcDirInterface {
         if (jobTypeName.equals(LOAD)) {
             return new LoadProcDir(catalog.getLoadInstance(), db);
         } else if (jobTypeName.equals(DELETE)) {
-            return new DeleteInfoProcDir(catalog.getLoadInstance(), db.getId());
+            return new DeleteInfoProcDir(catalog.getDeleteHandler(), catalog.getLoadInstance(), db.getId());
         } else if (jobTypeName.equals(ROLLUP)) {
             return new RollupProcDir(catalog.getRollupHandler(), db);
         } else if (jobTypeName.equals(SCHEMA_CHANGE)) {
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index ece84aa..7d535bb 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -282,6 +282,12 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_FINISH_DELETE: {
+                data = new DeleteInfo();
+                ((DeleteInfo) data).readFields(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_FINISH_ASYNC_DELETE: {
                 data = AsyncDeleteJob.read(in);
                 isRead = true;
diff --git a/fe/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java b/fe/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
index 4d7c403..57f1bf2 100644
--- a/fe/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
+++ b/fe/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java
@@ -322,6 +322,12 @@ public final class LocalJournalCursor implements JournalCursor {
                 ret.setData(info);
                 break;
             }
+            case OperationType.OP_FINISH_DELETE: {
+                DeleteInfo info = new DeleteInfo();
+                info.readFields(in);
+                ret.setData(info);
+                break;
+            }
             case OperationType.OP_FINISH_ASYNC_DELETE: {
                 AsyncDeleteJob deleteJob = AsyncDeleteJob.read(in);
                 ret.setData(deleteJob);
diff --git a/fe/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
new file mode 100644
index 0000000..cd85682
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -0,0 +1,630 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.DeleteStmt;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.ListComparator;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.DeleteJob;
+import org.apache.doris.task.DeleteJob.DeleteState;
+import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class DeleteHandler implements Writable {
+    private static final Logger LOG = LogManager.getLogger(DeleteHandler.class);
+
+    // TransactionId -> DeleteJob
+    private Map<Long, DeleteJob> idToDeleteJob;
+
+    // Db -> DeleteInfo list
+    @SerializedName(value = "dbToDeleteInfos")
+    private Map<Long, List<DeleteInfo>> dbToDeleteInfos;
+
+    public DeleteHandler() {
+        idToDeleteJob = Maps.newConcurrentMap();
+        dbToDeleteInfos = Maps.newConcurrentMap();
+    }
+
+    private enum CancelType {
+        METADATA_MISSING,
+        TIMEOUT,
+        COMMIT_FAIL,
+        UNKNOWN
+    }
+
+    public void process(DeleteStmt stmt) throws DdlException, QueryStateException {
+        String dbName = stmt.getDbName();
+        String tableName = stmt.getTableName();
+        String partitionName = stmt.getPartitionName();
+        List<Predicate> conditions = stmt.getDeleteConditions();
+        Database db = Catalog.getInstance().getDb(dbName);
+        if (db == null) {
+            throw new DdlException("Db does not exist. name: " + dbName);
+        }
+
+        DeleteJob deleteJob = null;
+        try {
+            MarkedCountDownLatch<Long, Long> countDownLatch;
+            long transactionId = -1;
+            db.readLock();
+            try {
+                Table table = db.getTable(tableName);
+                if (table == null) {
+                    throw new DdlException("Table does not exist. name: " + tableName);
+                }
+
+                if (table.getType() != Table.TableType.OLAP) {
+                    throw new DdlException("Not olap type table. type: " + table.getType().name());
+                }
+                OlapTable olapTable = (OlapTable) table;
+
+                if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
+                    throw new DdlException("Table's state is not normal: " + tableName);
+                }
+
+                if (partitionName == null) {
+                    if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE) {
+                        throw new DdlException("This is a range partitioned table."
+                                + " You should specify partition in delete stmt");
+                    } else {
+                        // this is a unpartitioned table, use table name as partition name
+                        partitionName = olapTable.getName();
+                    }
+                }
+
+                Partition partition = olapTable.getPartition(partitionName);
+                if (partition == null) {
+                    throw new DdlException("Partition does not exist. name: " + partitionName);
+                }
+
+                List<String> deleteConditions = Lists.newArrayList();
+
+                // pre check
+                checkDeleteV2(olapTable, partition, conditions, deleteConditions, true);
+
+                // generate label
+                String label = "delete_" + UUID.randomUUID();
+                //generate jobId
+                long jobId = Catalog.getCurrentCatalog().getNextId();
+                // begin txn here and generate txn id
+                transactionId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
+                        Lists.newArrayList(table.getId()), label, null, "FE: " + FrontendOptions.getLocalHostAddress(),
+                        TransactionState.LoadJobSourceType.FRONTEND, jobId, Config.stream_load_default_timeout_second);
+
+                DeleteInfo deleteInfo = new DeleteInfo(db.getId(), olapTable.getId(), tableName,
+                        partition.getId(), partitionName,
+                        -1, 0, deleteConditions);
+                deleteJob = new DeleteJob(jobId, transactionId, label, deleteInfo);
+                idToDeleteJob.put(deleteJob.getTransactionId(), deleteJob);
+
+                Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(deleteJob);
+                // task sent to be
+                AgentBatchTask batchTask = new AgentBatchTask();
+                // count total replica num
+                int totalReplicaNum = 0;
+                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                    for (Tablet tablet : index.getTablets()) {
+                        totalReplicaNum += tablet.getReplicas().size();
+                    }
+                }
+                countDownLatch = new MarkedCountDownLatch<Long, Long>(totalReplicaNum);
+
+                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                    long indexId = index.getId();
+                    int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
+
+                    for (Tablet tablet : index.getTablets()) {
+                        long tabletId = tablet.getId();
+
+                        // set push type
+                        TPushType type = TPushType.DELETE;
+
+                        for (Replica replica : tablet.getReplicas()) {
+                            long replicaId = replica.getId();
+                            long backendId = replica.getBackendId();
+                            countDownLatch.addMark(backendId, tabletId);
+
+                            // create push task for each replica
+                            PushTask pushTask = new PushTask(null,
+                                    replica.getBackendId(), db.getId(), olapTable.getId(),
+                                    partition.getId(), indexId,
+                                    tabletId, replicaId, schemaHash,
+                                    -1, 0, "", -1, 0,
+                                    -1, type, conditions,
+                                    true, TPriority.NORMAL,
+                                    TTaskType.REALTIME_PUSH,
+                                    transactionId,
+                                    Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
+                            pushTask.setIsSchemaChanging(false);
+                            pushTask.setCountDownLatch(countDownLatch);
+
+                            if (AgentTaskQueue.addTask(pushTask)) {
+                                batchTask.addTask(pushTask);
+                                deleteJob.addPushTask(pushTask);
+                                deleteJob.addTablet(tabletId);
+                            }
+                        }
+                    }
+                }
+
+                // submit push tasks
+                if (batchTask.getTaskNum() > 0) {
+                    AgentTaskExecutor.submit(batchTask);
+                }
+
+            } catch (Throwable t) {
+                LOG.warn("error occurred during delete process", t);
+                // if transaction has been begun, need to abort it
+                if (Catalog.getCurrentGlobalTransactionMgr().getTransactionState(transactionId) != null) {
+                    cancelJob(deleteJob, CancelType.UNKNOWN, t.getMessage());
+                }
+                throw new DdlException(t.getMessage(), t);
+            } finally {
+                db.readUnlock();
+            }
+
+            long timeoutMs = deleteJob.getTimeoutMs();
+            LOG.info("waiting delete Job finish, signature: {}, timeout: {}", transactionId, timeoutMs);
+            boolean ok = false;
+            try {
+                ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.warn("InterruptedException: ", e);
+                ok = false;
+            }
+
+            if (!ok) {
+                try {
+                    deleteJob.checkAndUpdateQuorum();
+                } catch (MetaNotFoundException e) {
+                    cancelJob(deleteJob, CancelType.METADATA_MISSING, e.getMessage());
+                    throw new DdlException(e.getMessage(), e);
+                }
+                DeleteState state = deleteJob.getState();
+                switch (state) {
+                    case UN_QUORUM:
+                        List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks();
+                        // only show at most 5 results
+                        List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5));
+                        String errMsg = "Unfinished replicas:" + Joiner.on(", ").join(subList);
+                        LOG.warn("delete job timeout: transactionId {}, {}", transactionId, errMsg);
+                        cancelJob(deleteJob, CancelType.TIMEOUT, "delete job timeout");
+                        throw new DdlException("failed to delete replicas from job: " + transactionId + ", " + errMsg);
+                    case QUORUM_FINISHED:
+                    case FINISHED:
+                        try {
+                            long nowQuorumTimeMs = System.currentTimeMillis();
+                            long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2;
+                            // if job's state is quorum_finished then wait for a period of time and commit it.
+                            while (deleteJob.getState() == DeleteState.QUORUM_FINISHED && endQuorumTimeoutMs > nowQuorumTimeMs) {
+                                deleteJob.checkAndUpdateQuorum();
+                                Thread.sleep(1000);
+                                nowQuorumTimeMs = System.currentTimeMillis();
+                            }
+                        } catch (MetaNotFoundException e) {
+                            cancelJob(deleteJob, CancelType.METADATA_MISSING, e.getMessage());
+                            throw new DdlException(e.getMessage(), e);
+                        } catch (InterruptedException e) {
+                            cancelJob(deleteJob, CancelType.UNKNOWN, e.getMessage());
+                            throw new DdlException(e.getMessage(), e);
+                        }
+                        commitJob(deleteJob, db, timeoutMs);
+                        break;
+                    default:
+                        Preconditions.checkState(false, "wrong delete job state: " + state.name());
+                        break;
+                }
+            } else {
+                commitJob(deleteJob, db, timeoutMs);
+            }
+        } finally {
+            if (!FeConstants.runningUnitTest) {
+                clearJob(deleteJob);
+            }
+        }
+    }
+
+    private void commitJob(DeleteJob job, Database db, long timeoutMs) throws DdlException, QueryStateException {
+        TransactionStatus status = null;
+        try {
+            unprotectedCommitJob(job, db, timeoutMs);
+            status = Catalog.getCurrentGlobalTransactionMgr().
+                    getTransactionState(job.getTransactionId()).getTransactionStatus();
+        } catch (UserException e) {
+            if (cancelJob(job, CancelType.COMMIT_FAIL, e.getMessage())) {
+                throw new DdlException(e.getMessage(), e);
+            }
+        }
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("{'label':'").append(job.getLabel()).append("', 'status':'").append(status.name());
+        sb.append("', 'txnId':'").append(job.getTransactionId()).append("'");
+
+        switch (status) {
+            case COMMITTED: {
+                // Although publish is unfinished we should tell user that commit already success.
+                String errMsg = "delete job is committed but may be taking effect later";
+                sb.append(", 'err':'").append(errMsg).append("'");
+                sb.append("}");
+                throw new QueryStateException(MysqlStateType.OK, sb.toString());
+            }
+            case VISIBLE: {
+                sb.append("}");
+                throw new QueryStateException(MysqlStateType.OK, sb.toString());
+            }
+            default:
+                Preconditions.checkState(false, "wrong transaction status: " + status.name());
+                break;
+        }
+    }
+
+    /**
+     * unprotected commit delete job
+     * return true when successfully commit and publish
+     * return false when successfully commit but publish unfinished.
+     * A UserException thrown if both commit and publish failed.
+     * @param job
+     * @param db
+     * @param timeoutMs
+     * @return
+     * @throws UserException
+     */
+    private boolean unprotectedCommitJob(DeleteJob job, Database db, long timeoutMs) throws UserException {
+        long transactionId = job.getTransactionId();
+        GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
+        List<TabletCommitInfo> tabletCommitInfos = new ArrayList<TabletCommitInfo>();
+        TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+        for (TabletDeleteInfo tDeleteInfo : job.getTabletDeleteInfo()) {
+            for (Replica replica : tDeleteInfo.getFinishedReplicas()) {
+                // the inverted index contains rolling up replica
+                Long tabletId = invertedIndex.getTabletIdByReplica(replica.getId());
+                if (tabletId == null) {
+                    LOG.warn("could not find tablet id for replica {}, the tablet maybe dropped", replica);
+                    continue;
+                }
+                tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendId()));
+            }
+        }
+        return globalTransactionMgr.commitAndPublishTransaction(db, transactionId, tabletCommitInfos, timeoutMs);
+    }
+
+    /**
+     * This method should always be called in the end of the delete process to clean the job.
+     * Better put it in finally block.
+     * @param job
+     */
+    private void clearJob(DeleteJob job) {
+        if (job != null) {
+            long signature = job.getTransactionId();
+            if (idToDeleteJob.containsKey(signature)) {
+                idToDeleteJob.remove(signature);
+            }
+            for (PushTask pushTask : job.getPushTasks()) {
+                AgentTaskQueue.removePushTask(pushTask.getBackendId(), pushTask.getSignature(),
+                        pushTask.getVersion(), pushTask.getVersionHash(),
+                        pushTask.getPushType(), pushTask.getTaskType());
+            }
+            Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(job.getId());
+        }
+    }
+
+    public void recordFinishedJob(DeleteJob job) {
+        if (job != null) {
+            long dbId = job.getDeleteInfo().getDbId();
+            LOG.info("record finished deleteJob, transactionId {}, dbId {}",
+                    job.getTransactionId(), dbId);
+            dbToDeleteInfos.putIfAbsent(dbId, Lists.newArrayList());
+            List<DeleteInfo> deleteInfoList = dbToDeleteInfos.get(dbId);
+            synchronized (deleteInfoList) {
+                deleteInfoList.add(job.getDeleteInfo());
+            }
+        }
+    }
+
+    /**
+     * abort delete job
+     * return true when successfully abort.
+     * return true when some unknown error happened, just ignore it.
+     * return false when the job is already committed
+     * @param job
+     * @param cancelType
+     * @param reason
+     * @return
+     */
+    public boolean cancelJob(DeleteJob job, CancelType cancelType, String reason) {
+        LOG.info("start to cancel delete job, transactionId: {}, cancelType: {}", job.getTransactionId(), cancelType.name());
+        GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
+        try {
+            if (job != null) {
+                globalTransactionMgr.abortTransaction(job.getTransactionId(), reason);
+            }
+        } catch (Exception e) {
+            TransactionState state = globalTransactionMgr.getTransactionState(job.getTransactionId());
+            if (state == null) {
+                LOG.warn("cancel delete job failed because txn not found, transactionId: {}", job.getTransactionId());
+            } else if (state.getTransactionStatus() == TransactionStatus.COMMITTED || state.getTransactionStatus() == TransactionStatus.VISIBLE) {
+                LOG.warn("cancel delete job {} failed because it has been committed, transactionId: {}", job.getTransactionId());
+                return false;
+            } else {
+                LOG.warn("errors while abort transaction", e);
+            }
+        }
+        return true;
+    }
+
+    public DeleteJob getDeleteJob(long transactionId) {
+        return idToDeleteJob.get(transactionId);
+    }
+
+    private void checkDeleteV2(OlapTable table, Partition partition, List<Predicate> conditions, List<String> deleteConditions, boolean preCheck)
+            throws DdlException {
+
+        // check partition state
+        Partition.PartitionState state = partition.getState();
+        if (state != Partition.PartitionState.NORMAL) {
+            // ErrorReport.reportDdlException(ErrorCode.ERR_BAD_PARTITION_STATE, partition.getName(), state.name());
+            throw new DdlException("Partition[" + partition.getName() + "]' state is not NORMAL: " + state.name());
+        }
+
+        // check condition column is key column and condition value
+        Map<String, Column> nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        for (Column column : table.getBaseSchema()) {
+            nameToColumn.put(column.getName(), column);
+        }
+        for (Predicate condition : conditions) {
+            SlotRef slotRef = null;
+            if (condition instanceof BinaryPredicate) {
+                BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
+                slotRef = (SlotRef) binaryPredicate.getChild(0);
+            } else if (condition instanceof IsNullPredicate) {
+                IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
+                slotRef = (SlotRef) isNullPredicate.getChild(0);
+            }
+            String columnName = slotRef.getColumnName();
+            if (!nameToColumn.containsKey(columnName)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName());
+            }
+
+            Column column = nameToColumn.get(columnName);
+            if (!column.isKey()) {
+                // ErrorReport.reportDdlException(ErrorCode.ERR_NOT_KEY_COLUMN, columnName);
+                throw new DdlException("Column[" + columnName + "] is not key column");
+            }
+
+            if (condition instanceof BinaryPredicate) {
+                String value = null;
+                try {
+                    BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
+                    value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue();
+                    LiteralExpr.create(value, Type.fromPrimitiveType(column.getDataType()));
+                } catch (AnalysisException e) {
+                    // ErrorReport.reportDdlException(ErrorCode.ERR_INVALID_VALUE, value);
+                    throw new DdlException("Invalid column value[" + value + "]");
+                }
+            }
+
+            // set schema column name
+            slotRef.setCol(column.getName());
+        }
+        Map<Long, List<Column>> indexIdToSchema = table.getIndexIdToSchema();
+        for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
+            // check table has condition column
+            Map<String, Column> indexColNameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+            for (Column column : indexIdToSchema.get(index.getId())) {
+                indexColNameToColumn.put(column.getName(), column);
+            }
+            String indexName = table.getIndexNameById(index.getId());
+            for (Predicate condition : conditions) {
+                String columnName = null;
+                if (condition instanceof BinaryPredicate) {
+                    BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
+                    columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
+                } else if (condition instanceof IsNullPredicate) {
+                    IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
+                    columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
+                }
+                Column column = indexColNameToColumn.get(columnName);
+                if (column == null) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, indexName);
+                }
+
+                if (table.getKeysType() == KeysType.DUP_KEYS && !column.isKey()) {
+                    throw new DdlException("Column[" + columnName + "] is not key column in index[" + indexName + "]");
+                }
+            }
+        }
+
+        if (deleteConditions == null) {
+            return;
+        }
+
+        // save delete conditions
+        for (Predicate condition : conditions) {
+            if (condition instanceof BinaryPredicate) {
+                BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
+                SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
+                String columnName = slotRef.getColumnName();
+                StringBuilder sb = new StringBuilder();
+                sb.append(columnName).append(" ").append(binaryPredicate.getOp().name()).append(" \"")
+                        .append(((LiteralExpr) binaryPredicate.getChild(1)).getStringValue()).append("\"");
+                deleteConditions.add(sb.toString());
+            } else if (condition instanceof IsNullPredicate) {
+                IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
+                SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
+                String columnName = slotRef.getColumnName();
+                StringBuilder sb = new StringBuilder();
+                sb.append(columnName);
+                if (isNullPredicate.isNotNull()) {
+                    sb.append(" IS NOT NULL");
+                } else {
+                    sb.append(" IS NULL");
+                }
+                deleteConditions.add(sb.toString());
+            }
+        }
+    }
+
+    // show delete stmt
+    public List<List<Comparable>> getDeleteInfosByDb(long dbId, boolean forUser) {
+        LinkedList<List<Comparable>> infos = new LinkedList<List<Comparable>>();
+        Database db = Catalog.getInstance().getDb(dbId);
+        if (db == null) {
+            return infos;
+        }
+
+        String dbName = db.getFullName();
+        List<DeleteInfo> deleteInfos = dbToDeleteInfos.get(dbId);
+        if (deleteInfos == null) {
+            return infos;
+        }
+
+        for (DeleteInfo deleteInfo : deleteInfos) {
+
+            if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName,
+                    deleteInfo.getTableName(),
+                    PrivPredicate.LOAD)) {
+                continue;
+            }
+
+
+            List<Comparable> info = Lists.newArrayList();
+            if (!forUser) {
+                info.add(-1L);
+                info.add(deleteInfo.getTableId());
+            }
+            info.add(deleteInfo.getTableName());
+            if (!forUser) {
+                info.add(deleteInfo.getPartitionId());
+            }
+            info.add(deleteInfo.getPartitionName());
+
+            info.add(TimeUtils.longToTimeString(deleteInfo.getCreateTimeMs()));
+            String conds = Joiner.on(", ").join(deleteInfo.getDeleteConditions());
+            info.add(conds);
+
+            if (!forUser) {
+                info.add(deleteInfo.getPartitionVersion());
+                info.add(deleteInfo.getPartitionVersionHash());
+            }
+            // for loading state, should not display loading, show deleting instead
+//                if (loadJob.getState() == LoadJob.JobState.LOADING) {
+//                    info.add("DELETING");
+//                } else {
+//                    info.add(loadJob.getState().name());
+//                }
+            info.add("FINISHED");
+            infos.add(info);
+        }
+        // sort by createTimeMs
+        int sortIndex;
+        if (!forUser) {
+            sortIndex = 5;
+        } else {
+            sortIndex = 2;
+        }
+        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(sortIndex);
+        Collections.sort(infos, comparator);
+        return infos;
+    }
+
+    public void replayDelete(DeleteInfo deleteInfo, Catalog catalog) {
+        // add to deleteInfos
+        long dbId = deleteInfo.getDbId();
+        LOG.info("replay delete, dbId {}", dbId);
+        dbToDeleteInfos.putIfAbsent(dbId, Lists.newArrayList());
+        List<DeleteInfo> deleteInfoList = dbToDeleteInfos.get(dbId);
+        synchronized (deleteInfoList) {
+            deleteInfoList.add(deleteInfo);
+        }
+    }
+
+    // for delete handler, we only persist those delete already finished.
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static DeleteHandler read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, DeleteHandler.class);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/DeleteInfo.java b/fe/src/main/java/org/apache/doris/load/DeleteInfo.java
index 360976f..8e866c4 100644
--- a/fe/src/main/java/org/apache/doris/load/DeleteInfo.java
+++ b/fe/src/main/java/org/apache/doris/load/DeleteInfo.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.gson.annotations.SerializedName;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
@@ -34,16 +35,25 @@ import com.google.common.collect.Lists;
 
 public class DeleteInfo implements Writable {
 
+    @SerializedName(value = "dbId")
     private long dbId;
+    @SerializedName(value = "tableId")
     private long tableId;
+    @SerializedName(value = "tableName")
     private String tableName;
+    @SerializedName(value = "partitionId")
     private long partitionId;
+    @SerializedName(value = "partitionName")
     private String partitionName;
+    @SerializedName(value = "partitionVersion")
     private long partitionVersion;
+    @SerializedName(value = "partitionVersionHash")
     private long partitionVersionHash;
     private List<ReplicaPersistInfo> replicaInfos;
 
+    @SerializedName(value = "deleteConditions")
     private List<String> deleteConditions;
+    @SerializedName(value = "createTimeMs")
     private long createTimeMs;
 
     private AsyncDeleteJob asyncDeleteJob;
diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java
index a7ad4cb..d2a88ee 100644
--- a/fe/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/src/main/java/org/apache/doris/load/Load.java
@@ -3095,7 +3095,7 @@ public class Load {
         PartitionState state = partition.getState();
         if (state != PartitionState.NORMAL) {
             // ErrorReport.reportDdlException(ErrorCode.ERR_BAD_PARTITION_STATE, partition.getName(), state.name());
-            throw new DdlException("Partition[" + partition.getName() + "]' state is not NORNAL: " + state.name());
+            throw new DdlException("Partition[" + partition.getName() + "]' state is not NORMAL: " + state.name());
         }
         // do not need check whether partition has loading job
 
diff --git a/fe/src/main/java/org/apache/doris/load/TabletDeleteInfo.java b/fe/src/main/java/org/apache/doris/load/TabletDeleteInfo.java
new file mode 100644
index 0000000..018ac8d
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/TabletDeleteInfo.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.Sets;
+import org.apache.doris.catalog.Replica;
+
+import java.util.Set;
+
+public class TabletDeleteInfo {
+    private long tabletId;
+    private Set<Replica> finishedReplicas;
+
+    public TabletDeleteInfo(long tabletId) {
+        this.tabletId = tabletId;
+        this.finishedReplicas = Sets.newConcurrentHashSet();
+    }
+
+    public long getTabletId() {
+        return tabletId;
+    }
+
+    public Set<Replica> getFinishedReplicas() {
+        return finishedReplicas;
+    }
+
+    public boolean addFinishedReplica(Replica replica) {
+        finishedReplicas.add(replica);
+        return true;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
index b6497bd..bd7b8ea 100644
--- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -47,6 +47,7 @@ import org.apache.doris.task.ClearAlterTask;
 import org.apache.doris.task.CloneTask;
 import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.CreateRollupTask;
+import org.apache.doris.task.DeleteJob;
 import org.apache.doris.task.DirMoveTask;
 import org.apache.doris.task.DownloadTask;
 import org.apache.doris.task.PublishVersionTask;
@@ -288,6 +289,7 @@ public class MasterImpl {
         long dbId = pushTask.getDbId();
         long backendId = pushTask.getBackendId();
         long signature = task.getSignature();
+        long transactionId = ((PushTask) task).getTransactionId();
         Database db = Catalog.getInstance().getDb(dbId);
         if (db == null) {
             AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
@@ -345,19 +347,34 @@ public class MasterImpl {
                                                                        task.getDbId());
             // handle load job
             // TODO yiguolei: why delete should check request version and task version?
-            long loadJobId = pushTask.getLoadJobId();
-            LoadJob job = Catalog.getInstance().getLoadInstance().getLoadJob(loadJobId);
-            if (job == null) {
-                throw new MetaNotFoundException("cannot find load job, job[" + loadJobId + "]");
-            }
-            for (TTabletInfo tTabletInfo : finishTabletInfos) {
-                checkReplica(olapTable, partition, backendId, pushIndexId, pushTabletId,
-                        tTabletInfo, pushState);
-                Replica replica = findRelatedReplica(olapTable, partition,
-                                                            backendId, tTabletInfo);
-                // if the replica is under schema change, could not find the replica with aim schema hash
-                if (replica != null) {
-                    job.addFinishedReplica(replica);
+            if (pushTask.getPushType() == TPushType.LOAD || pushTask.getPushType() == TPushType.LOAD_DELETE) {
+                long loadJobId = pushTask.getLoadJobId();
+                LoadJob job = Catalog.getInstance().getLoadInstance().getLoadJob(loadJobId);
+                if (job == null) {
+                    throw new MetaNotFoundException("cannot find load job, job[" + loadJobId + "]");
+                }
+                for (TTabletInfo tTabletInfo : finishTabletInfos) {
+                    checkReplica(olapTable, partition, backendId, pushIndexId, pushTabletId,
+                            tTabletInfo, pushState);
+                    Replica replica = findRelatedReplica(olapTable, partition,
+                            backendId, tTabletInfo);
+                    // if the replica is under schema change, could not find the replica with aim schema hash
+                    if (replica != null) {
+                        job.addFinishedReplica(replica);
+                    }
+                }
+            } else if (pushTask.getPushType() == TPushType.DELETE) {
+                DeleteJob deleteJob = Catalog.getInstance().getDeleteHandler().getDeleteJob(transactionId);
+                if (deleteJob == null) {
+                    throw new MetaNotFoundException("cannot find delete job, job[" + transactionId + "]");
+                }
+                for (TTabletInfo tTabletInfo : finishTabletInfos) {
+                    Replica replica = findRelatedReplica(olapTable, partition,
+                            backendId, tTabletInfo);
+                    if (replica != null) {
+                        deleteJob.addFinishedReplica(pushTabletId, replica);
+                        pushTask.countDownLatch(backendId, pushTabletId);
+                    }
                 }
             }
             
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index df1b4c9..134899a 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -45,6 +45,7 @@ import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.journal.bdbje.BDBJEJournal;
 import org.apache.doris.journal.bdbje.Timestamp;
 import org.apache.doris.load.AsyncDeleteJob;
+import org.apache.doris.load.DeleteHandler;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportJob;
 import org.apache.doris.load.ExportMgr;
@@ -382,6 +383,12 @@ public class EditLog {
                     load.replayDelete(info, catalog);
                     break;
                 }
+                case OperationType.OP_FINISH_DELETE: {
+                    DeleteInfo info = (DeleteInfo) journal.getData();
+                    DeleteHandler deleteHandler = catalog.getDeleteHandler();
+                    deleteHandler.replayDelete(info, catalog);
+                    break;
+                }
                 case OperationType.OP_FINISH_ASYNC_DELETE: {
                     AsyncDeleteJob deleteJob = (AsyncDeleteJob) journal.getData();
                     Load load = catalog.getLoadInstance();
@@ -1001,6 +1008,10 @@ public class EditLog {
         logEdit(OperationType.OP_FINISH_SYNC_DELETE, info);
     }
 
+    public void logFinishDelete(DeleteInfo info) {
+        logEdit(OperationType.OP_FINISH_DELETE, info);
+    }
+
     public void logFinishAsyncDelete(AsyncDeleteJob job) {
         logEdit(OperationType.OP_FINISH_ASYNC_DELETE, job);
     }
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index 65e4ab1..fd139cf 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -74,6 +74,7 @@ public class OperationType {
     public static final short OP_EXPORT_UPDATE_STATE = 37;
 
     public static final short OP_FINISH_SYNC_DELETE = 40;
+    public static final short OP_FINISH_DELETE = 41;
     public static final short OP_ADD_REPLICA = 42;
     public static final short OP_DELETE_REPLICA = 43;
     public static final short OP_FINISH_ASYNC_DELETE = 44;
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/src/main/java/org/apache/doris/qe/ConnectContext.java
index ef672d7..6094818 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -263,6 +263,10 @@ public class ConnectContext {
         return state;
     }
 
+    public void setState(QueryState state) {
+        this.state = state;
+    }
+
     public MysqlCapability getCapability() {
         return capability;
     }
diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
index deb277f..c511a99 100644
--- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -81,7 +81,7 @@ import org.apache.doris.load.Load;
 
 public class DdlExecutor {
     public static void execute(Catalog catalog, DdlStmt ddlStmt, OriginStatement origStmt)
-            throws DdlException, Exception {
+            throws DdlException, QueryStateException, Exception {
         if (ddlStmt instanceof CreateClusterStmt) {
             CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt;
             catalog.createCluster(stmt);
@@ -148,7 +148,7 @@ public class DdlExecutor {
         } else if (ddlStmt instanceof StopRoutineLoadStmt) {
             catalog.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) ddlStmt);
         }  else if (ddlStmt instanceof DeleteStmt) {
-            catalog.getLoadInstance().delete((DeleteStmt) ddlStmt);
+            catalog.getDeleteHandler().process((DeleteStmt) ddlStmt);
         } else if (ddlStmt instanceof CreateUserStmt) {
             CreateUserStmt stmt = (CreateUserStmt) ddlStmt;
             catalog.getAuth().createUser(stmt);
diff --git a/fe/src/main/java/org/apache/doris/qe/QueryState.java b/fe/src/main/java/org/apache/doris/qe/QueryState.java
index b9fde1f..ecaeb3f 100644
--- a/fe/src/main/java/org/apache/doris/qe/QueryState.java
+++ b/fe/src/main/java/org/apache/doris/qe/QueryState.java
@@ -88,6 +88,10 @@ public class QueryState {
         this.errorMessage = msg;
     }
 
+    public void setMsg(String msg) {
+        this.errorMessage = msg;
+    }
+
     public void setErrType(ErrType errType) {
         this.errType = errType;
     }
diff --git a/fe/src/main/java/org/apache/doris/qe/QueryStateException.java b/fe/src/main/java/org/apache/doris/qe/QueryStateException.java
new file mode 100644
index 0000000..0496c05
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/qe/QueryStateException.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import com.google.common.base.Strings;
+import org.apache.doris.common.UserException;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+
+public class QueryStateException extends UserException {
+    private QueryState queryState;
+    public QueryStateException(MysqlStateType stateType, String msg) {
+        super(Strings.nullToEmpty(msg));
+        createQueryState(stateType, msg);
+    }
+
+    public void createQueryState(MysqlStateType stateType, String msg) {
+        this.queryState = new QueryState();
+        switch (stateType) {
+            case OK:
+                queryState.setOk(0L, 0, msg);
+                break;
+            case ERR:
+                queryState.setError(msg);
+                break;
+            case EOF:
+                queryState.setEof();
+                queryState.setMsg(msg);
+                break;
+            case NOOP:
+                break;
+        }
+    }
+
+    public QueryState getQueryState() {
+        return queryState;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 5be3f7e..3104cac 100644
--- a/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -108,6 +108,7 @@ import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.OrderByPair;
+import org.apache.doris.load.DeleteHandler;
 import org.apache.doris.load.ExportJob;
 import org.apache.doris.load.ExportMgr;
 import org.apache.doris.load.Load;
@@ -1051,8 +1052,10 @@ public class ShowExecutor {
         }
         long dbId = db.getId();
 
+        DeleteHandler deleteHandler = catalog.getDeleteHandler();
         Load load = catalog.getLoadInstance();
-        List<List<Comparable>> deleteInfos = load.getDeleteInfosByDb(dbId, true);
+        List<List<Comparable>> deleteInfos = deleteHandler.getDeleteInfosByDb(dbId, true);
+        deleteInfos.addAll(load.getDeleteInfosByDb(dbId, true));
         List<List<String>> rows = Lists.newArrayList();
         for (List<Comparable> deleteInfo : deleteInfos) {
             List<String> oneInfo = new ArrayList<String>(deleteInfo.size());
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 04a17e0..109b737 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -867,6 +867,8 @@ public class StmtExecutor {
         try {
             DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt, originStmt);
             context.getState().setOk();
+        } catch (QueryStateException e) {
+            context.setState(e.getQueryState());
         } catch (UserException e) {
             // Return message to info client what happened.
             context.getState().setError(e.getMessage());
diff --git a/fe/src/main/java/org/apache/doris/task/DeleteJob.java b/fe/src/main/java/org/apache/doris/task/DeleteJob.java
new file mode 100644
index 0000000..3615920
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/task/DeleteJob.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.load.DeleteInfo;
+import org.apache.doris.load.TabletDeleteInfo;
+import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public class DeleteJob extends AbstractTxnStateChangeCallback {
+    private static final Logger LOG = LogManager.getLogger(DeleteJob.class);
+
+    public enum DeleteState {
+        UN_QUORUM,
+        QUORUM_FINISHED,
+        FINISHED
+    }
+
+    private DeleteState state;
+
+    // jobId(listenerId). use in beginTransaction to callback function
+    private long id;
+    // transaction id.
+    private long signature;
+    private String label;
+    private Set<Long> totalTablets;
+    private Set<Long> quorumTablets;
+    private Set<Long> finishedTablets;
+    Map<Long, TabletDeleteInfo> tabletDeleteInfoMap;
+    private Set<PushTask> pushTasks;
+    private DeleteInfo deleteInfo;
+
+    public DeleteJob(long id, long transactionId, String label, DeleteInfo deleteInfo) {
+        this.id = id;
+        this.signature = transactionId;
+        this.label = label;
+        this.deleteInfo = deleteInfo;
+        totalTablets = Sets.newHashSet();
+        finishedTablets = Sets.newHashSet();
+        quorumTablets = Sets.newHashSet();
+        tabletDeleteInfoMap = Maps.newConcurrentMap();
+        pushTasks = Sets.newHashSet();
+        state = DeleteState.UN_QUORUM;
+    }
+
+    /**
+     * check and update if this job's state is QUORUM_FINISHED or FINISHED
+     * The meaning of state:
+     * QUORUM_FINISHED: For each tablet there are more than half of its replicas have been finished
+     * FINISHED: All replicas of this jobs have finished
+     */
+    public void checkAndUpdateQuorum() throws MetaNotFoundException {
+        long dbId = deleteInfo.getDbId();
+        long tableId = deleteInfo.getTableId();
+        long partitionId = deleteInfo.getPartitionId();
+        Database db = Catalog.getInstance().getDb(dbId);
+        if (db == null) {
+            throw new MetaNotFoundException("can not find database "+ dbId +" when commit delete");
+        }
+
+        short replicaNum = -1;
+        db.readLock();
+        try {
+            OlapTable table = (OlapTable) db.getTable(tableId);
+            if (table == null) {
+                throw new MetaNotFoundException("can not find table "+ tableId +" when commit delete");
+            }
+            replicaNum = table.getPartitionInfo().getReplicationNum(partitionId);
+        } finally {
+            db.readUnlock();
+        }
+
+        short quorumNum = (short) (replicaNum / 2 + 1);
+        for (TabletDeleteInfo tDeleteInfo : getTabletDeleteInfo()) {
+            if (tDeleteInfo.getFinishedReplicas().size() == replicaNum) {
+                finishedTablets.add(tDeleteInfo.getTabletId());
+            }
+            if (tDeleteInfo.getFinishedReplicas().size() >= quorumNum) {
+                quorumTablets.add(tDeleteInfo.getTabletId());
+            }
+        }
+        LOG.info("check delete job quorum, transaction id: {}, total tablets: {}, quorum tablets: {},",
+                signature, totalTablets.size(), quorumTablets.size());
+
+        if (finishedTablets.containsAll(totalTablets)) {
+            setState(DeleteState.FINISHED);
+        } else if (quorumTablets.containsAll(totalTablets)) {
+            setState(DeleteState.QUORUM_FINISHED);
+        }
+    }
+
+    public void setState(DeleteState state) {
+        this.state = state;
+    }
+
+    public DeleteState getState() {
+        return this.state;
+    }
+
+    public boolean addTablet(long tabletId) {
+        return totalTablets.add(tabletId);
+    }
+
+    public boolean addPushTask(PushTask pushTask) {
+        return pushTasks.add(pushTask);
+    }
+
+    public boolean addFinishedReplica(long tabletId, Replica replica) {
+        tabletDeleteInfoMap.putIfAbsent(tabletId, new TabletDeleteInfo(tabletId));
+        TabletDeleteInfo tDeleteInfo =  tabletDeleteInfoMap.get(tabletId);
+        return tDeleteInfo.addFinishedReplica(replica);
+    }
+
+    public DeleteInfo getDeleteInfo() {
+        return deleteInfo;
+    }
+
+    public String getLabel() {
+        return this.label;
+    }
+
+    public Set<PushTask> getPushTasks() {
+        return pushTasks;
+    }
+
+    @Override
+    public long getId() {
+        return this.id;
+    }
+
+    @Override
+    public void afterVisible(TransactionState txnState, boolean txnOperated) {
+        if (!txnOperated) {
+            return;
+        }
+        executeFinish();
+        Catalog.getCurrentCatalog().getEditLog().logFinishDelete(deleteInfo);
+    }
+
+    public void executeFinish() {
+        setState(DeleteState.FINISHED);
+        Catalog.getCurrentCatalog().getDeleteHandler().recordFinishedJob(this);
+        Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
+    }
+
+    public long getTransactionId() {
+        return this.signature;
+    }
+
+    public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
+        return tabletDeleteInfoMap.values();
+    }
+
+    public long getTimeoutMs() {
+        // timeout is between 30 seconds to 5 min
+        long timeout = Math.max(totalTablets.size() * Config.tablet_delete_timeout_second * 1000L, 30000L);
+        return Math.min(timeout, Config.load_straggler_wait_second * 1000L);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/task/PushTask.java b/fe/src/main/java/org/apache/doris/task/PushTask.java
index 147fcbc..10998fc 100644
--- a/fe/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/src/main/java/org/apache/doris/task/PushTask.java
@@ -209,6 +209,10 @@ public class PushTask extends AgentTask {
     public long getAsyncDeleteJobId() {
         return asyncDeleteJobId;
     }
+
+    public long getTransactionId() {
+        return transactionId;
+    }
     
     public void setIsSchemaChanging(boolean isSchemaChanging) {
         this.isSchemaChanging = isSchemaChanging;
diff --git a/fe/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
new file mode 100644
index 0000000..6c57650
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -0,0 +1,451 @@
+package org.apache.doris.load;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.doris.analysis.AccessTestUtil;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.DeleteStmt;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.backup.CatalogMocker;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.DeleteJob;
+import org.apache.doris.task.DeleteJob.DeleteState;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.transaction.TxnCommitAttachment;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class DeleteHandlerTest {
+
+    private DeleteHandler deleteHandler;
+
+    private static final long BACKEND_ID_1 = 10000L;
+    private static final long BACKEND_ID_2 = 10001L;
+    private static final long BACKEND_ID_3 = 10002L;
+    private static final long REPLICA_ID_1 = 70000L;
+    private static final long REPLICA_ID_2 = 70001L;
+    private static final long REPLICA_ID_3 = 70002L;
+    private static final long TABLET_ID = 60000L;
+    private static final long PARTITION_ID = 40000L;
+    private static final long TBL_ID = 30000L;
+    private static final long DB_ID = 20000L;
+
+    @Mocked
+    private Catalog catalog;
+    @Mocked
+    private EditLog editLog;
+    @Mocked
+    private AgentTaskQueue agentTaskQueue;
+    @Mocked
+    private AgentTaskExecutor executor;
+
+    private Database db;
+    private PaloAuth auth;
+
+    Analyzer analyzer;
+
+    private GlobalTransactionMgr globalTransactionMgr;
+    private TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
+    private ConnectContext connectContext = new ConnectContext();
+
+    @Before
+    public void setUp() {
+        FeConstants.runningUnitTest = true;
+
+        globalTransactionMgr = new GlobalTransactionMgr(catalog);
+        globalTransactionMgr.setEditLog(editLog);
+        deleteHandler = new DeleteHandler();
+        auth = AccessTestUtil.fetchAdminAccess();
+        analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
+        try {
+            db = CatalogMocker.mockDb();
+        } catch (AnalysisException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+        TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID, TBL_ID, 0, null);
+        invertedIndex.addTablet(TABLET_ID, tabletMeta);
+        invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
+        invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
+        invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
+
+        new MockUp<EditLog>() {
+            @Mock
+            public void logSaveTransactionId(long transactionId) {
+            }
+            @Mock
+            public void logInsertTransactionState(TransactionState transactionState) {
+            }
+        };
+
+        new Expectations() {
+            {
+                catalog.getDb(anyString);
+                minTimes = 0;
+                result = db;
+
+                catalog.getDb(anyLong);
+                minTimes = 0;
+                result = db;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = auth;
+
+                catalog.getNextId();
+                minTimes = 0;
+                result = 10L;
+
+                catalog.getTabletInvertedIndex();
+                minTimes = 0;
+                result = invertedIndex;
+            }
+        };
+
+        new Expectations() {
+            {
+                Catalog.getCurrentCatalog();
+                minTimes = 0;
+                result = catalog;
+
+                Catalog.getCurrentInvertedIndex();
+                minTimes = 0;
+                result = invertedIndex;
+
+                Catalog.getCurrentGlobalTransactionMgr();
+                minTimes = 0;
+                result = globalTransactionMgr;
+
+                AgentTaskExecutor.submit((AgentBatchTask) any);
+                minTimes = 0;
+
+                AgentTaskQueue.addTask((AgentTask) any);
+                minTimes = 0;
+                result = true;
+            }
+        };
+    }
+
+    @Test(expected = DdlException.class)
+    public void testUnQuorumTimeout() throws DdlException, QueryStateException {
+        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
+                new IntLiteral(3));
+
+        DeleteStmt deleteStmt = new DeleteStmt(new TableName("test_db", "test_tbl"),
+                new PartitionNames(false, Lists.newArrayList("test_tbl")), binaryPredicate);
+        try {
+            deleteStmt.analyze(analyzer);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        deleteHandler.process(deleteStmt);
+        Assert.fail();
+    }
+
+    @Test
+    public void testQuorumTimeout() throws DdlException, QueryStateException {
+        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
+                new IntLiteral(3));
+
+        DeleteStmt deleteStmt = new DeleteStmt(new TableName("test_db", "test_tbl"),
+                new PartitionNames(false, Lists.newArrayList("test_tbl")), binaryPredicate);
+
+        Set<Replica> finishedReplica = Sets.newHashSet();
+        finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
+
+        new MockUp<DeleteJob>() {
+            @Mock
+            public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
+                return Lists.newArrayList(tabletDeleteInfo);
+            }
+        };
+
+        new MockUp<GlobalTransactionMgr>() {
+            @Mock
+            public TransactionState getTransactionState(long transactionId) {
+                TransactionState transactionState =  new TransactionState();
+                transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
+                return transactionState;
+            }
+        };
+
+        try {
+            deleteStmt.analyze(analyzer);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        try {
+            deleteHandler.process(deleteStmt);
+        }catch (QueryStateException e) {
+        }
+
+        Map<Long, DeleteJob> idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob");
+        Collection<DeleteJob> jobs = idToDeleteJob.values();
+        Assert.assertEquals(1, jobs.size());
+        for (DeleteJob job : jobs) {
+            Assert.assertEquals(job.getState(), DeleteState.QUORUM_FINISHED);
+        }
+    }
+
+    @Test
+    public void testNormalTimeout() throws DdlException, QueryStateException {
+        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
+                new IntLiteral(3));
+
+        DeleteStmt deleteStmt = new DeleteStmt(new TableName("test_db", "test_tbl"),
+                new PartitionNames(false, Lists.newArrayList("test_tbl")), binaryPredicate);
+
+        Set<Replica> finishedReplica = Sets.newHashSet();
+        finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
+
+        new MockUp<DeleteJob>() {
+            @Mock
+            public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
+                return Lists.newArrayList(tabletDeleteInfo);
+            }
+        };
+
+        new MockUp<GlobalTransactionMgr>() {
+            @Mock
+            public TransactionState getTransactionState(long transactionId) {
+                TransactionState transactionState =  new TransactionState();
+                transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
+                return transactionState;
+            }
+        };
+
+        try {
+            deleteStmt.analyze(analyzer);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+
+        try {
+            deleteHandler.process(deleteStmt);
+        } catch (QueryStateException e) {
+        }
+
+        Map<Long, DeleteJob> idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob");
+        Collection<DeleteJob> jobs = idToDeleteJob.values();
+        Assert.assertEquals(1, jobs.size());
+        for (DeleteJob job : jobs) {
+            Assert.assertEquals(job.getState(), DeleteState.FINISHED);
+        }
+    }
+
+    @Test(expected = DdlException.class)
+    public void testCommitFail(@Mocked MarkedCountDownLatch countDownLatch) throws DdlException, QueryStateException {
+        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
+                new IntLiteral(3));
+
+        DeleteStmt deleteStmt = new DeleteStmt(new TableName("test_db", "test_tbl"),
+                new PartitionNames(false, Lists.newArrayList("test_tbl")), binaryPredicate);
+
+        Set<Replica> finishedReplica = Sets.newHashSet();
+        finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
+
+        new MockUp<DeleteJob>() {
+            @Mock
+            public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
+                return Lists.newArrayList(tabletDeleteInfo);
+            }
+        };
+
+        new Expectations() {
+            {
+                try {
+                    countDownLatch.await(anyLong, (TimeUnit) any);
+                } catch (InterruptedException e) {
+                }
+                result = false;
+            }
+        };
+
+        new Expectations(globalTransactionMgr) {
+            {
+                try {
+                    globalTransactionMgr.commitTransaction(anyLong, anyLong, (List<TabletCommitInfo>) any, (TxnCommitAttachment) any);
+                } catch (UserException e) {
+                }
+                result = new UserException("commit fail");
+            }
+        };
+
+        try {
+            deleteStmt.analyze(analyzer);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        try {
+            deleteHandler.process(deleteStmt);
+        } catch (DdlException e) {
+            Map<Long, DeleteJob> idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob");
+            Collection<DeleteJob> jobs = idToDeleteJob.values();
+            Assert.assertEquals(1, jobs.size());
+            for (DeleteJob job : jobs) {
+                Assert.assertEquals(job.getState(), DeleteState.FINISHED);
+            }
+            throw e;
+        } catch (QueryStateException e) {
+        }
+        Assert.fail();
+    }
+
+    @Test
+    public void testPublishFail(@Mocked MarkedCountDownLatch countDownLatch, @Mocked AgentTaskExecutor taskExecutor) throws DdlException, QueryStateException {
+        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
+                new IntLiteral(3));
+
+        DeleteStmt deleteStmt = new DeleteStmt(new TableName("test_db", "test_tbl"),
+                new PartitionNames(false, Lists.newArrayList("test_tbl")), binaryPredicate);
+
+        Set<Replica> finishedReplica = Sets.newHashSet();
+        finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
+
+        new MockUp<DeleteJob>() {
+            @Mock
+            public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
+                return Lists.newArrayList(tabletDeleteInfo);
+            }
+        };
+
+        new Expectations() {
+            {
+                try {
+                    countDownLatch.await(anyLong, (TimeUnit) any);
+                } catch (InterruptedException e) {
+                }
+                result = false;
+            }
+        };
+
+        new Expectations() {
+            {
+                AgentTaskExecutor.submit((AgentBatchTask) any);
+                minTimes = 0;
+            }
+        };
+
+        try {
+            deleteStmt.analyze(analyzer);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        try {
+            deleteHandler.process(deleteStmt);
+        } catch (QueryStateException e) {
+        }
+
+        Map<Long, DeleteJob> idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob");
+        Collection<DeleteJob> jobs = idToDeleteJob.values();
+        Assert.assertEquals(1, jobs.size());
+        for (DeleteJob job : jobs) {
+            Assert.assertEquals(job.getState(), DeleteState.FINISHED);
+        }
+    }
+
+    @Test
+    public void testNormal(@Mocked MarkedCountDownLatch countDownLatch) throws DdlException, QueryStateException {
+        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"),
+                new IntLiteral(3));
+
+        DeleteStmt deleteStmt = new DeleteStmt(new TableName("test_db", "test_tbl"),
+                new PartitionNames(false, Lists.newArrayList("test_tbl")), binaryPredicate);
+
+        Set<Replica> finishedReplica = Sets.newHashSet();
+        finishedReplica.add(new Replica(REPLICA_ID_1, BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_2, BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
+        finishedReplica.add(new Replica(REPLICA_ID_3, BACKEND_ID_3, 0, Replica.ReplicaState.NORMAL));
+        TabletDeleteInfo tabletDeleteInfo = new TabletDeleteInfo(TABLET_ID);
+        tabletDeleteInfo.getFinishedReplicas().addAll(finishedReplica);
+
+        new MockUp<DeleteJob>() {
+            @Mock
+            public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
+                return Lists.newArrayList(tabletDeleteInfo);
+            }
+        };
+
+        new Expectations() {
+            {
+                try {
+                    countDownLatch.await(anyLong, (TimeUnit) any);
+                } catch (InterruptedException e) {
+                }
+                result = false;
+            }
+        };
+
+        try {
+            deleteStmt.analyze(analyzer);
+        } catch (UserException e) {
+            Assert.fail();
+        }
+        try {
+            deleteHandler.process(deleteStmt);
+        } catch (QueryStateException e) {
+        }
+
+        Map<Long, DeleteJob> idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob");
+        Collection<DeleteJob> jobs = idToDeleteJob.values();
+        Assert.assertEquals(1, jobs.size());
+        for (DeleteJob job : jobs) {
+            Assert.assertEquals(job.getState(), DeleteState.FINISHED);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org