You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/23 16:04:45 UTC
[doris] branch master updated: [refactor]Remove load_delete job (#10353)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b8d2c96842 [refactor]Remove load_delete job (#10353)
b8d2c96842 is described below
commit b8d2c96842b1e58631b63cb40ebd9f864b612614
Author: yiguolei <67...@qq.com>
AuthorDate: Fri Jun 24 00:04:38 2022 +0800
[refactor]Remove load_delete job (#10353)
---
be/src/agent/agent_server.cpp | 1 -
be/src/olap/task/engine_batch_load_task.cpp | 7 +-
.../java/org/apache/doris/master/MasterImpl.java | 178 +--------------------
.../org/apache/doris/master/ReportHandler.java | 4 -
.../java/org/apache/doris/task/AgentBatchTask.java | 5 +-
.../java/org/apache/doris/task/AgentTaskQueue.java | 4 -
.../main/java/org/apache/doris/task/PushTask.java | 11 --
.../java/org/apache/doris/task/AgentTaskTest.java | 15 --
.../apache/doris/utframe/MockedBackendFactory.java | 1 -
9 files changed, 5 insertions(+), 221 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 34cdfe5731..c907aafd23 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -164,7 +164,6 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
break;
}
if (task.push_req.push_type == TPushType::LOAD ||
- task.push_req.push_type == TPushType::LOAD_DELETE ||
task.push_req.push_type == TPushType::LOAD_V2) {
_push_workers->submit_task(task);
} else if (task.push_req.push_type == TPushType::DELETE) {
diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp
index 7add215823..2fb942e0ef 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -65,8 +65,7 @@ EngineBatchLoadTask::~EngineBatchLoadTask() {}
Status EngineBatchLoadTask::execute() {
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
Status status = Status::OK();
- if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE ||
- _push_req.push_type == TPushType::LOAD_V2) {
+ if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_V2) {
status = _init();
if (status.ok()) {
uint32_t retry_time = 0;
@@ -301,9 +300,7 @@ Status EngineBatchLoadTask::_push(const TPushReq& request,
}
PushType type = PUSH_NORMAL;
- if (request.push_type == TPushType::LOAD_DELETE) {
- type = PUSH_FOR_LOAD_DELETE;
- } else if (request.push_type == TPushType::LOAD_V2) {
+ if (request.push_type == TPushType::LOAD_V2) {
type = PUSH_NORMAL_V2;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 2810156003..1593dd737b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -33,7 +33,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.loadv2.SparkLoadJob;
-import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
@@ -67,7 +66,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
@@ -150,11 +148,6 @@ public class MasterImpl {
Preconditions.checkState(request.isSetReportVersion());
finishCreateReplica(task, request);
break;
- case PUSH:
- checkHasTabletInfo(request);
- Preconditions.checkState(request.isSetReportVersion());
- finishPush(task, request);
- break;
case REALTIME_PUSH:
checkHasTabletInfo(request);
Preconditions.checkState(request.isSetReportVersion());
@@ -361,7 +354,7 @@ public class MasterImpl {
// handle load job
// TODO yiguolei: why delete should check request version and task version?
- if (pushTask.getPushType() == TPushType.LOAD || pushTask.getPushType() == TPushType.LOAD_DELETE) {
+ if (pushTask.getPushType() == TPushType.LOAD) {
long loadJobId = pushTask.getLoadJobId();
LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(loadJobId);
if (job == null) {
@@ -479,127 +472,6 @@ public class MasterImpl {
return replica;
}
- private void finishPush(AgentTask task, TFinishTaskRequest request) {
- List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
- Preconditions.checkState(finishTabletInfos != null && !finishTabletInfos.isEmpty());
-
- PushTask pushTask = (PushTask) task;
- // if replica report already update replica version and load checker add new version push task,
- // we might get new version push task, so check task version first
- // all tablets in tablet infos should have same version and version hash
- long finishVersion = finishTabletInfos.get(0).getVersion();
- long taskVersion = pushTask.getVersion();
- if (finishVersion != taskVersion) {
- LOG.debug("finish tablet version is not consistent with task. "
- + "finish version: {}, task: {}",
- finishVersion, pushTask);
- return;
- }
-
- long dbId = pushTask.getDbId();
- long backendId = pushTask.getBackendId();
- long signature = task.getSignature();
- Database db = Catalog.getCurrentCatalog().getDbNullable(dbId);
- if (db == null) {
- AgentTaskQueue.removePushTask(backendId, signature, finishVersion,
- pushTask.getPushType(), pushTask.getTaskType());
- return;
- }
-
- long tableId = pushTask.getTableId();
- long partitionId = pushTask.getPartitionId();
- long pushIndexId = pushTask.getIndexId();
- long pushTabletId = pushTask.getTabletId();
-
- // push finish type:
- // numOfFinishTabletInfos tabletId schemaHash
- // Normal: 1 / /
- // SchemaChangeHandler 2 same diff
- // RollupHandler 2 diff diff
- //
- // reuse enum 'PartitionState' here as 'push finish type'
- PartitionState pushState = null;
- if (finishTabletInfos.size() == 1) {
- pushState = PartitionState.NORMAL;
- } else if (finishTabletInfos.size() == 2) {
- if (finishTabletInfos.get(0).getTabletId() == finishTabletInfos.get(1).getTabletId()) {
- pushState = PartitionState.SCHEMA_CHANGE;
- } else {
- pushState = PartitionState.ROLLUP;
- }
- } else {
- LOG.warn("invalid push report infos. finishTabletInfos' size: " + finishTabletInfos.size());
- return;
- }
-
- LOG.debug("push report state: {}", pushState.name());
-
- OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
- if (olapTable == null || !olapTable.writeLockIfExist()) {
- AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
- LOG.warn("finish push replica error, cannot find table[" + tableId + "] when push finished");
- return;
- }
- try {
- Partition partition = olapTable.getPartition(partitionId);
- if (partition == null) {
- throw new MetaNotFoundException("cannot find partition[" + partitionId + "] when push finished");
- }
-
- // update replica version
- List<ReplicaPersistInfo> infos = new LinkedList<ReplicaPersistInfo>();
- List<Long> tabletIds = finishTabletInfos.stream().map(
- finishTabletInfo -> finishTabletInfo.getTabletId()).collect(Collectors.toList());
- List<TabletMeta> tabletMetaList = Catalog.getCurrentInvertedIndex().getTabletMetaList(tabletIds);
- for (int i = 0; i < tabletMetaList.size(); i++) {
- TabletMeta tabletMeta = tabletMetaList.get(i);
- TTabletInfo tTabletInfo = finishTabletInfos.get(i);
- long indexId = tabletMeta.getIndexId();
- ReplicaPersistInfo info = updateReplicaInfo(olapTable, partition,
- backendId, pushIndexId, indexId,
- tTabletInfo, pushState);
- if (info != null) {
- infos.add(info);
- }
- }
-
- // should be done before addReplicaPersistInfos and countDownLatch
- long reportVersion = request.getReportVersion();
- Catalog.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(), reportVersion,
- task.getDbId(), task.getTableId());
-
- if (pushTask.getPushType() == TPushType.LOAD || pushTask.getPushType() == TPushType.LOAD_DELETE) {
- // handle load job
- long loadJobId = pushTask.getLoadJobId();
- LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(loadJobId);
- if (job == null) {
- throw new MetaNotFoundException("cannot find load job, job[" + loadJobId + "]");
- }
-
- Preconditions.checkState(!infos.isEmpty());
- for (ReplicaPersistInfo info : infos) {
- job.addReplicaPersistInfos(info);
- }
- } else if (pushTask.getPushType() == TPushType.DELETE) {
- // report delete task must match version and version hash
- if (pushTask.getVersion() != request.getRequestVersion()) {
- throw new MetaNotFoundException("delete task is not match. [" + pushTask.getVersion() + "-"
- + request.getRequestVersion() + "]");
- }
- }
-
- AgentTaskQueue.removePushTask(backendId, signature, finishVersion,
- pushTask.getPushType(), pushTask.getTaskType());
- LOG.debug("finish push replica. tabletId: {}, backendId: {}", pushTabletId, backendId);
- } catch (MetaNotFoundException e) {
- AgentTaskQueue.removePushTask(backendId, signature, finishVersion,
- pushTask.getPushType(), pushTask.getTaskType());
- LOG.warn("finish push replica error", e);
- } finally {
- olapTable.writeUnlock();
- }
- }
-
private void finishClearAlterTask(AgentTask task, TFinishTaskRequest request) {
ClearAlterTask clearAlterTask = (ClearAlterTask) task;
clearAlterTask.setFinished(true);
@@ -632,54 +504,6 @@ public class MasterImpl {
publishVersionTask.getSignature());
}
- private ReplicaPersistInfo updateReplicaInfo(OlapTable olapTable, Partition partition,
- long backendId, long pushIndexId, long indexId,
- TTabletInfo tTabletInfo, PartitionState pushState)
- throws MetaNotFoundException {
- long tabletId = tTabletInfo.getTabletId();
- int schemaHash = tTabletInfo.getSchemaHash();
- long version = tTabletInfo.getVersion();
- long rowCount = tTabletInfo.getRowCount();
- long dataSize = tTabletInfo.getDataSize();
-
- if (indexId != pushIndexId) {
- // this may be a rollup tablet
- if (pushState != PartitionState.ROLLUP && indexId != TabletInvertedIndex.NOT_EXIST_VALUE) {
- // this probably should not happened. add log to observe(cmy)
- LOG.warn("push task report tablet[{}] with different index[{}] and is not in ROLLUP. push index[{}]",
- tabletId, indexId, pushIndexId);
- return null;
- }
-
- if (indexId == TabletInvertedIndex.NOT_EXIST_VALUE) {
- LOG.warn("tablet[{}] may be dropped. push index[{}]", tabletId, pushIndexId);
- return null;
- }
- return null;
- }
-
- MaterializedIndex materializedIndex = partition.getIndex(pushIndexId);
- if (materializedIndex == null) {
- throw new MetaNotFoundException("Cannot find index[" + pushIndexId + "]");
- }
- Tablet tablet = materializedIndex.getTablet(tabletId);
- if (tablet == null) {
- throw new MetaNotFoundException("Cannot find tablet[" + tabletId + "]");
- }
-
- // update replica info
- Replica replica = tablet.getReplicaByBackendId(backendId);
- if (replica == null) {
- throw new MetaNotFoundException("cannot find replica in tablet[" + tabletId + "], backend[" + backendId
- + "]");
- }
- replica.updateVersionInfo(version, dataSize, rowCount);
-
- LOG.debug("replica[{}] report schemaHash:{}", replica.getId(), schemaHash);
- return ReplicaPersistInfo.createForLoad(olapTable.getId(), partition.getId(), pushIndexId, tabletId,
- replica.getId(), version, schemaHash, dataSize, rowCount);
- }
-
private void finishDropReplica(AgentTask task) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.DROP, task.getSignature());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 1f5e146bd4..79f9d131ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -57,14 +57,12 @@ import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.PublishVersionTask;
-import org.apache.doris.task.PushTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TMasterResult;
import org.apache.doris.thrift.TPartitionVersionInfo;
-import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TReportRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
@@ -359,8 +357,6 @@ public class ReportHandler extends Daemon {
// 3. CHECK_CONSISTENCY
// 4. STORAGE_MDEIUM_MIGRATE
if (task.getTaskType() == TTaskType.CREATE
- || (task.getTaskType() == TTaskType.PUSH && ((PushTask) task).getPushType() == TPushType.DELETE
- && ((PushTask) task).isSyncDelete())
|| task.getTaskType() == TTaskType.CHECK_CONSISTENCY
|| task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) {
continue;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index bd7380839c..43b1267091 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -210,15 +210,14 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setDropTabletReq(request);
return tAgentTaskRequest;
}
- case REALTIME_PUSH:
- case PUSH: {
+ case REALTIME_PUSH: {
PushTask pushTask = (PushTask) task;
TPushReq request = pushTask.toThrift();
if (LOG.isDebugEnabled()) {
LOG.debug(request.toString());
}
tAgentTaskRequest.setPushReq(request);
- if (pushTask.getPushType() == TPushType.LOAD || pushTask.getPushType() == TPushType.LOAD_DELETE) {
+ if (pushTask.getPushType() == TPushType.LOAD) {
tAgentTaskRequest.setResourceInfo(pushTask.getResourceInfo());
}
tAgentTaskRequest.setPriority(pushTask.getPriority());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 6406fbc18c..1677701272 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -67,10 +67,6 @@ public class AgentTaskQueue {
signatureMap.put(signature, task);
++taskNum;
LOG.debug("add task: type[{}], backend[{}], signature[{}]", type, backendId, signature);
- if (type == TTaskType.PUSH) {
- PushTask pushTask = (PushTask) task;
- LOG.debug("push task info: version[{}]", pushTask.getVersion());
- }
return true;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index c24189141d..1c32c9680b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -94,16 +94,6 @@ public class PushTask extends AgentTask {
this.tDescriptorTable = null;
}
- public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId, long partitionId,
- long indexId, long tabletId, long replicaId, int schemaHash, long version,
- String filePath, long fileSize, int timeoutSecond, long loadJobId, TPushType pushType,
- List<Predicate> conditions, boolean needDecompress, TPriority priority) {
- this(resourceInfo, backendId, dbId, tableId, partitionId, indexId,
- tabletId, replicaId, schemaHash, version, filePath,
- fileSize, timeoutSecond, loadJobId, pushType, conditions, needDecompress,
- priority, TTaskType.PUSH, -1, tableId);
- }
-
// for load v2 (SparkLoadJob)
public PushTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
long replicaId, int schemaHash, int timeoutSecond, long loadJobId, TPushType pushType,
@@ -126,7 +116,6 @@ public class PushTask extends AgentTask {
request.setIsSchemaChanging(isSchemaChanging);
switch (pushType) {
case LOAD:
- case LOAD_DELETE:
request.setHttpFilePath(filePath);
if (fileSize != -1) {
request.setHttpFileSize(fileSize);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 0cb1a8d4f8..a0ba30612a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -29,8 +29,6 @@ import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TCompressionType;
-import org.apache.doris.thrift.TPriority;
-import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletType;
@@ -83,7 +81,6 @@ public class AgentTaskTest {
private AgentTask createReplicaTask;
private AgentTask dropTask;
- private AgentTask pushTask;
private AgentTask cloneTask;
private AgentTask cancelDeleteTask;
private AgentTask storageMediaMigrationTask;
@@ -117,12 +114,6 @@ public class AgentTaskTest {
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1);
- // push
- pushTask =
- new PushTask(null, backendId1, dbId, tableId, partitionId, indexId1, tabletId1,
- replicaId1, schemaHash1, version, "/home/a", 10L, 200, 80000L,
- TPushType.LOAD, null, false, TPriority.NORMAL);
-
// clone
cloneTask =
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, schemaHash1,
@@ -175,12 +166,6 @@ public class AgentTaskTest {
Assert.assertEquals(dropTask.getSignature(), request2.getSignature());
Assert.assertNotNull(request2.getDropTabletReq());
- // push
- TAgentTaskRequest request3 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, pushTask);
- Assert.assertEquals(TTaskType.PUSH, request3.getTaskType());
- Assert.assertEquals(pushTask.getSignature(), request3.getSignature());
- Assert.assertNotNull(request3.getPushReq());
-
// clone
TAgentTaskRequest request4 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, cloneTask);
Assert.assertEquals(TTaskType.CLONE, request4.getTaskType());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 6a184f36ab..7c9c513572 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -164,7 +164,6 @@ public class MockedBackendFactory {
TTaskType taskType = request.getTaskType();
switch (taskType) {
case CREATE:
- case PUSH:
case ALTER:
++reportVersion;
break;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org