You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/23 16:04:45 UTC

[doris] branch master updated: [refactor]Remove load_delete job (#10353)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b8d2c96842 [refactor]Remove load_delete job (#10353)
b8d2c96842 is described below

commit b8d2c96842b1e58631b63cb40ebd9f864b612614
Author: yiguolei <67...@qq.com>
AuthorDate: Fri Jun 24 00:04:38 2022 +0800

    [refactor]Remove load_delete job (#10353)
---
 be/src/agent/agent_server.cpp                      |   1 -
 be/src/olap/task/engine_batch_load_task.cpp        |   7 +-
 .../java/org/apache/doris/master/MasterImpl.java   | 178 +--------------------
 .../org/apache/doris/master/ReportHandler.java     |   4 -
 .../java/org/apache/doris/task/AgentBatchTask.java |   5 +-
 .../java/org/apache/doris/task/AgentTaskQueue.java |   4 -
 .../main/java/org/apache/doris/task/PushTask.java  |  11 --
 .../java/org/apache/doris/task/AgentTaskTest.java  |  15 --
 .../apache/doris/utframe/MockedBackendFactory.java |   1 -
 9 files changed, 5 insertions(+), 221 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 34cdfe5731..c907aafd23 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -164,7 +164,6 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
                 break;
             }
             if (task.push_req.push_type == TPushType::LOAD ||
-                task.push_req.push_type == TPushType::LOAD_DELETE ||
                 task.push_req.push_type == TPushType::LOAD_V2) {
                 _push_workers->submit_task(task);
             } else if (task.push_req.push_type == TPushType::DELETE) {
diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp
index 7add215823..2fb942e0ef 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -65,8 +65,7 @@ EngineBatchLoadTask::~EngineBatchLoadTask() {}
 Status EngineBatchLoadTask::execute() {
     SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
     Status status = Status::OK();
-    if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE ||
-        _push_req.push_type == TPushType::LOAD_V2) {
+    if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_V2) {
         status = _init();
         if (status.ok()) {
             uint32_t retry_time = 0;
@@ -301,9 +300,7 @@ Status EngineBatchLoadTask::_push(const TPushReq& request,
     }
 
     PushType type = PUSH_NORMAL;
-    if (request.push_type == TPushType::LOAD_DELETE) {
-        type = PUSH_FOR_LOAD_DELETE;
-    } else if (request.push_type == TPushType::LOAD_V2) {
+    if (request.push_type == TPushType::LOAD_V2) {
         type = PUSH_NORMAL_V2;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 2810156003..1593dd737b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -33,7 +33,6 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.load.DeleteJob;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.loadv2.SparkLoadJob;
-import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.system.Backend;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
@@ -67,7 +66,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -150,11 +148,6 @@ public class MasterImpl {
                     Preconditions.checkState(request.isSetReportVersion());
                     finishCreateReplica(task, request);
                     break;
-                case PUSH:
-                    checkHasTabletInfo(request);
-                    Preconditions.checkState(request.isSetReportVersion());
-                    finishPush(task, request);
-                    break;
                 case REALTIME_PUSH:
                     checkHasTabletInfo(request);
                     Preconditions.checkState(request.isSetReportVersion());
@@ -361,7 +354,7 @@ public class MasterImpl {
 
             // handle load job
             // TODO yiguolei: why delete should check request version and task version?
-            if (pushTask.getPushType() == TPushType.LOAD || pushTask.getPushType() == TPushType.LOAD_DELETE) {
+            if (pushTask.getPushType() == TPushType.LOAD) {
                 long loadJobId = pushTask.getLoadJobId();
                 LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(loadJobId);
                 if (job == null) {
@@ -479,127 +472,6 @@ public class MasterImpl {
         return replica;
     }
 
-    private void finishPush(AgentTask task, TFinishTaskRequest request) {
-        List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
-        Preconditions.checkState(finishTabletInfos != null && !finishTabletInfos.isEmpty());
-
-        PushTask pushTask = (PushTask) task;
-        // if replica report already update replica version and load checker add new version push task,
-        // we might get new version push task, so check task version first
-        // all tablets in tablet infos should have same version and version hash
-        long finishVersion = finishTabletInfos.get(0).getVersion();
-        long taskVersion = pushTask.getVersion();
-        if (finishVersion != taskVersion) {
-            LOG.debug("finish tablet version is not consistent with task. "
-                    + "finish version: {}, task: {}",
-                    finishVersion, pushTask);
-            return;
-        }
-
-        long dbId = pushTask.getDbId();
-        long backendId = pushTask.getBackendId();
-        long signature = task.getSignature();
-        Database db = Catalog.getCurrentCatalog().getDbNullable(dbId);
-        if (db == null) {
-            AgentTaskQueue.removePushTask(backendId, signature, finishVersion,
-                                          pushTask.getPushType(), pushTask.getTaskType());
-            return;
-        }
-
-        long tableId = pushTask.getTableId();
-        long partitionId = pushTask.getPartitionId();
-        long pushIndexId = pushTask.getIndexId();
-        long pushTabletId = pushTask.getTabletId();
-
-        // push finish type:
-        //                  numOfFinishTabletInfos  tabletId schemaHash
-        // Normal:                     1                   /          /
-        // SchemaChangeHandler         2                 same      diff
-        // RollupHandler               2                 diff      diff
-        //
-        // reuse enum 'PartitionState' here as 'push finish type'
-        PartitionState pushState = null;
-        if (finishTabletInfos.size() == 1) {
-            pushState = PartitionState.NORMAL;
-        } else if (finishTabletInfos.size() == 2) {
-            if (finishTabletInfos.get(0).getTabletId() == finishTabletInfos.get(1).getTabletId()) {
-                pushState = PartitionState.SCHEMA_CHANGE;
-            } else {
-                pushState = PartitionState.ROLLUP;
-            }
-        } else {
-            LOG.warn("invalid push report infos. finishTabletInfos' size: " + finishTabletInfos.size());
-            return;
-        }
-
-        LOG.debug("push report state: {}", pushState.name());
-
-        OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
-        if (olapTable == null || !olapTable.writeLockIfExist()) {
-            AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
-            LOG.warn("finish push replica error, cannot find table[" + tableId + "] when push finished");
-            return;
-        }
-        try {
-            Partition partition = olapTable.getPartition(partitionId);
-            if (partition == null) {
-                throw new MetaNotFoundException("cannot find partition[" + partitionId + "] when push finished");
-            }
-
-            // update replica version
-            List<ReplicaPersistInfo> infos = new LinkedList<ReplicaPersistInfo>();
-            List<Long> tabletIds = finishTabletInfos.stream().map(
-                    finishTabletInfo -> finishTabletInfo.getTabletId()).collect(Collectors.toList());
-            List<TabletMeta> tabletMetaList = Catalog.getCurrentInvertedIndex().getTabletMetaList(tabletIds);
-            for (int i = 0; i < tabletMetaList.size(); i++) {
-                TabletMeta tabletMeta = tabletMetaList.get(i);
-                TTabletInfo tTabletInfo = finishTabletInfos.get(i);
-                long indexId = tabletMeta.getIndexId();
-                ReplicaPersistInfo info = updateReplicaInfo(olapTable, partition,
-                        backendId, pushIndexId, indexId,
-                        tTabletInfo, pushState);
-                if (info != null) {
-                    infos.add(info);
-                }
-            }
-
-            // should be done before addReplicaPersistInfos and countDownLatch
-            long reportVersion = request.getReportVersion();
-            Catalog.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(), reportVersion,
-                                                                       task.getDbId(), task.getTableId());
-
-            if (pushTask.getPushType() == TPushType.LOAD || pushTask.getPushType() == TPushType.LOAD_DELETE) {
-                // handle load job
-                long loadJobId = pushTask.getLoadJobId();
-                LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(loadJobId);
-                if (job == null) {
-                    throw new MetaNotFoundException("cannot find load job, job[" + loadJobId + "]");
-                }
-
-                Preconditions.checkState(!infos.isEmpty());
-                for (ReplicaPersistInfo info : infos) {
-                    job.addReplicaPersistInfos(info);
-                }
-            } else if (pushTask.getPushType() == TPushType.DELETE) {
-                // report delete task must match version and version hash
-                if (pushTask.getVersion() != request.getRequestVersion()) {
-                    throw new MetaNotFoundException("delete task is not match. [" + pushTask.getVersion() + "-"
-                            + request.getRequestVersion() + "]");
-                }
-            }
-
-            AgentTaskQueue.removePushTask(backendId, signature, finishVersion,
-                                          pushTask.getPushType(), pushTask.getTaskType());
-            LOG.debug("finish push replica. tabletId: {}, backendId: {}", pushTabletId, backendId);
-        } catch (MetaNotFoundException e) {
-            AgentTaskQueue.removePushTask(backendId, signature, finishVersion,
-                                          pushTask.getPushType(), pushTask.getTaskType());
-            LOG.warn("finish push replica error", e);
-        } finally {
-            olapTable.writeUnlock();
-        }
-    }
-
     private void finishClearAlterTask(AgentTask task, TFinishTaskRequest request) {
         ClearAlterTask clearAlterTask = (ClearAlterTask) task;
         clearAlterTask.setFinished(true);
@@ -632,54 +504,6 @@ public class MasterImpl {
                                   publishVersionTask.getSignature());
     }
 
-    private ReplicaPersistInfo updateReplicaInfo(OlapTable olapTable, Partition partition,
-                                                 long backendId, long pushIndexId, long indexId,
-                                                 TTabletInfo tTabletInfo, PartitionState pushState)
-            throws MetaNotFoundException {
-        long tabletId = tTabletInfo.getTabletId();
-        int schemaHash = tTabletInfo.getSchemaHash();
-        long version = tTabletInfo.getVersion();
-        long rowCount = tTabletInfo.getRowCount();
-        long dataSize = tTabletInfo.getDataSize();
-
-        if (indexId != pushIndexId) {
-            // this may be a rollup tablet
-            if (pushState != PartitionState.ROLLUP && indexId != TabletInvertedIndex.NOT_EXIST_VALUE) {
-                // this probably should not happened. add log to observe(cmy)
-                LOG.warn("push task report tablet[{}] with different index[{}] and is not in ROLLUP. push index[{}]",
-                         tabletId, indexId, pushIndexId);
-                return null;
-            }
-
-            if (indexId == TabletInvertedIndex.NOT_EXIST_VALUE) {
-                LOG.warn("tablet[{}] may be dropped. push index[{}]", tabletId, pushIndexId);
-                return null;
-            }
-            return null;
-        }
-
-        MaterializedIndex materializedIndex = partition.getIndex(pushIndexId);
-        if (materializedIndex == null) {
-            throw new MetaNotFoundException("Cannot find index[" + pushIndexId + "]");
-        }
-        Tablet tablet = materializedIndex.getTablet(tabletId);
-        if (tablet == null) {
-            throw new MetaNotFoundException("Cannot find tablet[" + tabletId + "]");
-        }
-
-        // update replica info
-        Replica replica = tablet.getReplicaByBackendId(backendId);
-        if (replica == null) {
-            throw new MetaNotFoundException("cannot find replica in tablet[" + tabletId + "], backend[" + backendId
-                    + "]");
-        }
-        replica.updateVersionInfo(version, dataSize, rowCount);
-
-        LOG.debug("replica[{}] report schemaHash:{}", replica.getId(), schemaHash);
-        return ReplicaPersistInfo.createForLoad(olapTable.getId(), partition.getId(), pushIndexId, tabletId,
-                replica.getId(), version, schemaHash, dataSize, rowCount);
-    }
-
     private void finishDropReplica(AgentTask task) {
         AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.DROP, task.getSignature());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 1f5e146bd4..79f9d131ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -57,14 +57,12 @@ import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTask;
 import org.apache.doris.task.PublishVersionTask;
-import org.apache.doris.task.PushTask;
 import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.task.UpdateTabletMetaInfoTask;
 import org.apache.doris.thrift.TBackend;
 import org.apache.doris.thrift.TDisk;
 import org.apache.doris.thrift.TMasterResult;
 import org.apache.doris.thrift.TPartitionVersionInfo;
-import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TReportRequest;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
@@ -359,8 +357,6 @@ public class ReportHandler extends Daemon {
             // 3. CHECK_CONSISTENCY
             // 4. STORAGE_MDEIUM_MIGRATE
             if (task.getTaskType() == TTaskType.CREATE
-                    || (task.getTaskType() == TTaskType.PUSH && ((PushTask) task).getPushType() == TPushType.DELETE
-                    && ((PushTask) task).isSyncDelete())
                     || task.getTaskType() == TTaskType.CHECK_CONSISTENCY
                     || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) {
                 continue;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index bd7380839c..43b1267091 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -210,15 +210,14 @@ public class AgentBatchTask implements Runnable {
                 tAgentTaskRequest.setDropTabletReq(request);
                 return tAgentTaskRequest;
             }
-            case REALTIME_PUSH:
-            case PUSH: {
+            case REALTIME_PUSH: {
                 PushTask pushTask = (PushTask) task;
                 TPushReq request = pushTask.toThrift();
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(request.toString());
                 }
                 tAgentTaskRequest.setPushReq(request);
-                if (pushTask.getPushType() == TPushType.LOAD || pushTask.getPushType() == TPushType.LOAD_DELETE) {
+                if (pushTask.getPushType() == TPushType.LOAD) {
                     tAgentTaskRequest.setResourceInfo(pushTask.getResourceInfo());
                 }
                 tAgentTaskRequest.setPriority(pushTask.getPriority());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 6406fbc18c..1677701272 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -67,10 +67,6 @@ public class AgentTaskQueue {
         signatureMap.put(signature, task);
         ++taskNum;
         LOG.debug("add task: type[{}], backend[{}], signature[{}]", type, backendId, signature);
-        if (type == TTaskType.PUSH) {
-            PushTask pushTask = (PushTask) task;
-            LOG.debug("push task info: version[{}]", pushTask.getVersion());
-        }
         return true;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index c24189141d..1c32c9680b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -94,16 +94,6 @@ public class PushTask extends AgentTask {
         this.tDescriptorTable = null;
     }
 
-    public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId, long partitionId,
-            long indexId, long tabletId, long replicaId, int schemaHash, long version,
-            String filePath, long fileSize, int timeoutSecond, long loadJobId, TPushType pushType,
-            List<Predicate> conditions, boolean needDecompress, TPriority priority) {
-        this(resourceInfo, backendId, dbId, tableId, partitionId, indexId,
-                tabletId, replicaId, schemaHash, version, filePath,
-                fileSize, timeoutSecond, loadJobId, pushType, conditions, needDecompress,
-                priority, TTaskType.PUSH, -1, tableId);
-    }
-
     // for load v2 (SparkLoadJob)
     public PushTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
                     long replicaId, int schemaHash, int timeoutSecond, long loadJobId, TPushType pushType,
@@ -126,7 +116,6 @@ public class PushTask extends AgentTask {
         request.setIsSchemaChanging(isSchemaChanging);
         switch (pushType) {
             case LOAD:
-            case LOAD_DELETE:
                 request.setHttpFilePath(filePath);
                 if (fileSize != -1) {
                     request.setHttpFileSize(fileSize);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 0cb1a8d4f8..a0ba30612a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -29,8 +29,6 @@ import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.thrift.TAgentTaskRequest;
 import org.apache.doris.thrift.TBackend;
 import org.apache.doris.thrift.TCompressionType;
-import org.apache.doris.thrift.TPriority;
-import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTabletType;
@@ -83,7 +81,6 @@ public class AgentTaskTest {
 
     private AgentTask createReplicaTask;
     private AgentTask dropTask;
-    private AgentTask pushTask;
     private AgentTask cloneTask;
     private AgentTask cancelDeleteTask;
     private AgentTask storageMediaMigrationTask;
@@ -117,12 +114,6 @@ public class AgentTaskTest {
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1);
 
-        // push
-        pushTask =
-                new PushTask(null, backendId1, dbId, tableId, partitionId, indexId1, tabletId1,
-                             replicaId1, schemaHash1, version, "/home/a", 10L, 200, 80000L,
-                             TPushType.LOAD, null, false, TPriority.NORMAL);
-
         // clone
         cloneTask =
                 new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, schemaHash1,
@@ -175,12 +166,6 @@ public class AgentTaskTest {
         Assert.assertEquals(dropTask.getSignature(), request2.getSignature());
         Assert.assertNotNull(request2.getDropTabletReq());
 
-        // push
-        TAgentTaskRequest request3 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, pushTask);
-        Assert.assertEquals(TTaskType.PUSH, request3.getTaskType());
-        Assert.assertEquals(pushTask.getSignature(), request3.getSignature());
-        Assert.assertNotNull(request3.getPushReq());
-
         // clone
         TAgentTaskRequest request4 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, cloneTask);
         Assert.assertEquals(TTaskType.CLONE, request4.getTaskType());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 6a184f36ab..7c9c513572 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -164,7 +164,6 @@ public class MockedBackendFactory {
                             TTaskType taskType = request.getTaskType();
                             switch (taskType) {
                                 case CREATE:
-                                case PUSH:
                                 case ALTER:
                                     ++reportVersion;
                                     break;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org