You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/12/22 04:25:36 UTC

[dolphinscheduler] branch dev updated: Remove dao module in worker (#13242)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 14ec4a2398 Remove dao module in worker (#13242)
14ec4a2398 is described below

commit 14ec4a239813be6dfeedb919f5fbff28eb5ec9dc
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Dec 22 12:25:29 2022 +0800

    Remove dao module in worker (#13242)
---
 .gitignore                                         |   1 +
 docs/docs/en/architecture/design.md                |   4 +-
 docs/docs/zh/architecture/design.md                |   4 +-
 dolphinscheduler-api/pom.xml                       |   4 +
 .../dolphinscheduler/api/ApiApplicationServer.java |   2 +-
 .../api/controller/DataSourceController.java       |   2 +-
 .../dto/resources/visitor/ResourceTreeVisitor.java |   2 +-
 .../dolphinscheduler/api/python/PythonGateway.java |   2 +-
 .../api/service/ResourcesService.java              |   2 +-
 .../api/service/impl/MonitorServiceImpl.java       |   2 +-
 .../service/impl/ProcessDefinitionServiceImpl.java |   2 +-
 .../service/impl/ProcessInstanceServiceImpl.java   |   2 +-
 .../api/service/impl/ResourcesServiceImpl.java     |   4 +-
 .../service/impl/TaskDefinitionServiceImpl.java    |   2 +-
 .../api/service/impl/TenantServiceImpl.java        |   2 +-
 .../api/service/impl/UdfFuncServiceImpl.java       |   2 +-
 .../api/service/impl/UsersServiceImpl.java         |   2 +-
 .../api/service/impl/WorkerGroupServiceImpl.java   |   2 +-
 .../api/controller/WorkerGroupControllerTest.java  |   2 +-
 .../resources/visitor/ResourceTreeVisitorTest.java |   2 +-
 .../api/python/PythonGatewayTest.java              |   2 +-
 .../api/service/BaseServiceTest.java               |   4 +-
 .../api/service/MonitorServiceTest.java            |   2 +-
 .../api/service/ProcessInstanceServiceTest.java    |   2 +-
 .../api/service/ResourcesServiceTest.java          |   4 +-
 .../api/service/TaskDefinitionServiceImplTest.java |   2 +-
 .../api/service/TenantServiceTest.java             |   2 +-
 .../api/service/UdfFuncServiceTest.java            |   2 +-
 .../api/service/UsersServiceTest.java              |   2 +-
 .../api/service/WorkerGroupServiceTest.java        |   2 +-
 .../common/constants/Constants.java                |   6 -
 .../plugin/datasource/api/utils/CommonUtils.java   |  10 +
 dolphinscheduler-master/pom.xml                    |   5 +
 .../server/master/MasterServer.java                |   2 +-
 .../registry/MasterConnectionStateListener.java    |   2 +-
 .../master/registry/MasterRegistryClient.java      |   2 +-
 .../server/master/registry/MasterStopStrategy.java |   2 +-
 .../master/registry/MasterWaitingStrategy.java     |   2 +-
 .../server/master/registry/ServerNodeManager.java  |   2 +-
 .../server/master/rpc/MasterRPCServer.java         |   2 +-
 .../master/runner/StreamTaskExecuteRunnable.java   |   2 +-
 .../master/runner/task/BaseTaskProcessor.java      |   9 +-
 .../master/service/MasterFailoverService.java      |   2 +-
 .../master/service/WorkerFailoverService.java      |   2 +-
 .../server/master/task/MasterHeartBeatTask.java    |   2 +-
 .../src/main/resources/logback-spring.xml          |   4 +-
 .../master/registry/MasterRegistryClientTest.java  |   2 +-
 .../server/master/service/FailoverServiceTest.java |   2 +-
 .../dolphinscheduler-registry-api/pom.xml          |   7 +
 .../registry/api}/RegistryClient.java              |  26 +-
 .../remote/processor}/LoggerRequestProcessor.java  |  21 +-
 dolphinscheduler-service/pom.xml                   | 198 ----------
 .../service/bean/SpringApplicationContext.java     |   9 +
 .../service/cache/impl/CacheNotifyServiceImpl.java |   2 +-
 .../service/process/ProcessServiceImpl.java        |   2 +-
 .../service/storage/StoreConfiguration.java        |  60 ---
 .../service/utils/CommonUtils.java                 |  77 ----
 .../dolphinscheduler/service/utils/LogUtils.java   |   2 +-
 .../service/utils/LoggerUtils.java                 |  26 +-
 .../service/utils/ProcessUtils.java                |  33 +-
 .../service/cache/CacheNotifyServiceTest.java      |   2 +-
 .../service/log/LoggerRequestProcessorTest.java    |   1 +
 .../service/log/MasterLogFilterTest.java           |  56 ---
 .../service/log/TaskLogDiscriminatorTest.java      |   1 +
 .../service/log/TaskLogFilterTest.java             |   1 +
 .../service/log/WorkerLogFilterTest.java           |  66 ----
 .../service/process/ProcessServiceTest.java        |   2 +-
 .../service/utils/CommonUtilsTest.java             |  19 -
 .../service/utils/LogUtilsTest.java                |   2 +-
 .../service/utils/ProcessUtilsTest.java            |  28 --
 .../src/main/resources/logback-spring.xml          |   4 +-
 .../dolphinscheduler-storage-all/pom.xml           |  52 +++
 .../dolphinscheduler-storage-api/pom.xml           |  36 ++
 .../plugin/storage/api/StorageConfiguration.java   |  50 +++
 .../plugin/storage/api}/StorageEntity.java         |   2 +-
 .../plugin/storage/api}/StorageOperate.java        |  14 +-
 .../plugin/storage/api/StorageOperateFactory.java  |  20 +-
 .../plugin/storage/api/StorageType.java            |  42 ++-
 .../dolphinscheduler-storage-hdfs}/pom.xml         |  58 +--
 .../plugin/storage/hdfs/HdfsStorageOperator.java   |  99 +----
 .../storage/hdfs/HdfsStorageOperatorFactory.java   |  40 +-
 .../storage/hdfs/HdfsStorageOperatorTest.java      |  16 +-
 .../dolphinscheduler-storage-oss/pom.xml           |  42 +++
 .../plugin/storage/oss/OssStorageOperator.java     |  13 +-
 .../storage/oss/OssStorageOperatorFactory.java     |  40 +-
 .../plugin/storage/oss/OssStorageOperatorTest.java |   8 +-
 .../dolphinscheduler-storage-s3/pom.xml            |  46 +++
 .../plugin/storage/s3/S3StorageOperator.java       |  95 ++---
 .../storage/s3/S3StorageOperatorFactory.java       |  26 +-
 dolphinscheduler-storage-plugin/pom.xml            |  49 +++
 .../plugin/task/api/AbstractCommandExecutor.java   |   3 +-
 .../plugin/task/api/ProcessUtils.java              | 106 ------
 .../plugin/task/api}/TaskPluginManager.java        |   5 +-
 .../plugin/task/api}/log/TaskLogDiscriminator.java |   2 +-
 .../plugin/task/api}/log/TaskLogFilter.java        |   2 +-
 .../plugin/task/api/utils/LogUtils.java            | 107 ++++++
 .../plugin/task/api/utils/ProcessUtils.java        | 402 +++++++++++++++++++++
 dolphinscheduler-worker/pom.xml                    |  13 +-
 .../server/worker/WorkerServer.java                |  29 +-
 .../server/worker/message/MessageRetryRunner.java  |   6 +-
 .../worker/processor/TaskDispatchProcessor.java    |  17 +-
 .../processor/TaskExecuteResultAckProcessor.java   |   6 +-
 .../processor/TaskExecuteRunningAckProcessor.java  |   6 +-
 .../server/worker/processor/TaskKillProcessor.java |  25 +-
 .../worker/processor/TaskRejectAckProcessor.java   |   6 +-
 .../worker/processor/TaskSavePointProcessor.java   |   6 +-
 .../registry/WorkerConnectionStateListener.java    |   4 -
 .../worker/registry/WorkerRegistryClient.java      |   4 +-
 .../server/worker/registry/WorkerStopStrategy.java |   2 +-
 .../worker/registry/WorkerWaitingStrategy.java     |   2 +-
 .../server/worker/rpc/WorkerRpcServer.java         |   2 +-
 .../DefaultWorkerDelayTaskExecuteRunnable.java     |  17 +-
 ...faultWorkerDelayTaskExecuteRunnableFactory.java |  19 +-
 .../runner/WorkerDelayTaskExecuteRunnable.java     |  17 +-
 .../WorkerDelayTaskExecuteRunnableFactory.java     |  12 +-
 .../worker/runner/WorkerTaskExecuteRunnable.java   |  43 ++-
 .../WorkerTaskExecuteRunnableFactoryBuilder.java   |  10 +-
 .../server/worker/task/WorkerHeartBeatTask.java    |   2 +-
 .../worker/utils/TaskExecutionCheckerUtils.java    |   2 +-
 .../worker/utils/TaskFilesTransferUtils.java       |   2 +-
 .../src/main/resources/application.yaml            |  29 --
 .../src/main/resources/logback-spring.xml          |   4 +-
 .../server/worker/config/BeanConfigTest.java       |  40 --
 .../processor/TaskDispatchProcessorTest.java       |   8 +-
 .../worker/registry/WorkerRegistryClientTest.java  |   2 +-
 .../DefaultWorkerDelayTaskExecuteRunnableTest.java |   8 +-
 .../worker/utils/TaskFilesTransferUtilsTest.java   |   2 +-
 pom.xml                                            |  12 +
 128 files changed, 1228 insertions(+), 1280 deletions(-)

diff --git a/.gitignore b/.gitignore
index a51ce35064..342f13bd6a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -39,6 +39,7 @@ config.gypi
 test/coverage
 /docs/zh_CN/介绍
 /docs/zh_CN/贡献代码.md
+derby.log
 dolphinscheduler-common/src/main/resources/zookeeper.properties
 dolphinscheduler-dao/src/main/resources/dao/data_source.properties
 dolphinscheduler-alert/logs/
diff --git a/docs/docs/en/architecture/design.md b/docs/docs/en/architecture/design.md
index 26f60d9761..dfd443e963 100644
--- a/docs/docs/en/architecture/design.md
+++ b/docs/docs/en/architecture/design.md
@@ -199,8 +199,8 @@ In the early schedule design, if there is no priority design and use the fair sc
 ```xml
 <conversionRule conversionWord="message" converterClass="org.apache.dolphinscheduler.common.log.SensitiveDataConverter"/>
 <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
-    <filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
-    <Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
+    <filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/>
+    <Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator">
         <key>taskAppId</key>
         <logBase>${log.base}</logBase>
     </Discriminator>
diff --git a/docs/docs/zh/architecture/design.md b/docs/docs/zh/architecture/design.md
index dbfb1c1613..246bd6b1ac 100644
--- a/docs/docs/zh/architecture/design.md
+++ b/docs/docs/zh/architecture/design.md
@@ -197,8 +197,8 @@
 ```xml
 <conversionRule conversionWord="message" converterClass="org.apache.dolphinscheduler.common.log.SensitiveDataConverter"/>
 <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
-    <filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
-    <Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
+    <filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/>
+    <Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator">
         <key>taskAppId</key>
         <logBase>${log.base}</logBase>
     </Discriminator>
diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml
index f8e6b44cb6..8badbf4e74 100644
--- a/dolphinscheduler-api/pom.xml
+++ b/dolphinscheduler-api/pom.xml
@@ -77,6 +77,10 @@
             <groupId>org.apache.dolphinscheduler</groupId>
             <artifactId>dolphinscheduler-scheduler-all</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-all</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.codehaus.janino</groupId>
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 2994208ea2..7824dc2a5d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.enums.PluginType;
 import org.apache.dolphinscheduler.dao.PluginDao;
 import org.apache.dolphinscheduler.dao.entity.PluginDefine;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
 import org.apache.dolphinscheduler.spi.params.base.PluginParams;
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
index 169d9bddf8..6418995d43 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
@@ -38,9 +38,9 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
-import org.apache.dolphinscheduler.service.utils.CommonUtils;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
index 1f404c31e1..ccd133ed7e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.api.dto.resources.visitor;
 import org.apache.dolphinscheduler.api.dto.resources.Directory;
 import org.apache.dolphinscheduler.api.dto.resources.FileLeaf;
 import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index cbada02e42..893e2bd83c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -58,7 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import py4j.GatewayServer;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index b4e105a8d6..094a60e6db 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import java.io.IOException;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
index d0c7720b1c..2b13f4d5c9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
@@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.common.model.WorkerServerModel;
 import org.apache.dolphinscheduler.dao.MonitorDBDao;
 import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 
 import java.util.HashMap;
 import java.util.List;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 4b94af1ee7..f6c331d249 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -111,6 +111,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.model.PageListingResult;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -118,7 +119,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index be1ff200dd..5d0b00f721 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -72,6 +72,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@@ -79,7 +80,6 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index cc495aa257..a3d5c16846 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -61,10 +61,10 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.collections4.CollectionUtils;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 9abbb07253..6e10493200 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -63,9 +63,9 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 503e1c9d1d..a0bfd599d4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -41,7 +41,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
index a7973eff94..fcee1608f8 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index f6fbbc5a70..54dc99bc71 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -55,7 +55,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 4e46298b24..daf8c06338 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -38,8 +38,8 @@ import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapp
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
index d3236786e2..73abb63fa0 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
@@ -29,7 +29,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java
index c51145b3a9..49fe5c9b66 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java
@@ -17,7 +17,7 @@
 package org.apache.dolphinscheduler.api.dto.resources.visitor;
 
 import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
index a015d6e01c..173ae98854 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
@@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import java.util.Date;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
index ba2577f933..ea5988f560 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils;
+import org.apache.dolphinscheduler.plugin.storage.hdfs.HdfsStorageOperator;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -48,7 +48,7 @@ public class BaseServiceTest {
     private BaseServiceImpl baseService;
 
     @Mock
-    private HadoopUtils hadoopUtils;
+    private HdfsStorageOperator hdfsStorageOperator;
 
     @BeforeEach
     public void setUp() {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
index a61a7481a2..0d80d288f7 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.MonitorDBDao;
 import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import org.apache.commons.collections4.CollectionUtils;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index abf29bbc5b..2ddefced0c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -58,12 +58,12 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.model.TaskNode;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import java.io.IOException;
 import java.text.MessageFormat;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
index a588a5fbb7..6c64133216 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
@@ -40,8 +40,8 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.collections4.CollectionUtils;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index b6668dff15..a066b81124 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -56,9 +56,9 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import java.text.MessageFormat;
 import java.util.ArrayList;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
index 739196eea0..2b9fc0125b 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
@@ -40,7 +40,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 
 import org.apache.commons.collections4.CollectionUtils;
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
index f3e6471950..f3015c6e0e 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
@@ -34,7 +34,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 
 import org.apache.commons.collections4.CollectionUtils;
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
index 353dc22012..67862e1557 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
@@ -48,7 +48,7 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.collections4.CollectionUtils;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index ea8aaf6dc3..4393a42c07 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -37,8 +37,8 @@ import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index 8d2c1e2e15..b72520f14e 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -51,12 +51,6 @@ public final class Constants {
 
     public static final String RESOURCE_TYPE_UDF = "udfs";
 
-    public static final String STORAGE_S3 = "S3";
-
-    public static final String STORAGE_OSS = "OSS";
-
-    public static final String STORAGE_HDFS = "HDFS";
-
     public static final String EMPTY_STRING = "";
 
     /**
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
index 0841e33eeb..35432774bf 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
@@ -28,6 +28,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LOGIN_US
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LOGIN_USER_KEY_TAB_USERNAME;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESOURCE_UPLOAD_PATH;
 
+import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.ResUploadType;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 
@@ -46,6 +47,15 @@ public class CommonUtils {
         throw new UnsupportedOperationException("Construct CommonUtils");
     }
 
+    private static final boolean IS_DEVELOP_MODE = PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE, true);
+
+    /**
+     * @return is develop mode
+     */
+    public static boolean isDevelopMode() {
+        return IS_DEVELOP_MODE;
+    }
+
     /**
      * if upload resource is HDFS and kerberos startup is true , else false
      *
diff --git a/dolphinscheduler-master/pom.xml b/dolphinscheduler-master/pom.xml
index 7443ae1196..be975407e9 100644
--- a/dolphinscheduler-master/pom.xml
+++ b/dolphinscheduler-master/pom.xml
@@ -68,6 +68,11 @@
             <artifactId>dolphinscheduler-task-all</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-all</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-cache</artifactId>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 7e302096f9..353f2ae129 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
@@ -28,7 +29,6 @@ import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
 import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import javax.annotation.PostConstruct;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
index 545112a699..b223c7e8c0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -20,8 +20,8 @@ package org.apache.dolphinscheduler.server.master.registry;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import lombok.NonNull;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index c00be5556b..c319d393ed 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -25,11 +25,11 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.service.FailoverService;
 import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
index da2aff1377..5ac361ddca 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.StrategyType;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
index 1c962f2920..a007e221b8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.lifecycle.ServerStatus;
 import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.StrategyType;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -28,7 +29,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
 import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
 import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.time.Duration;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 8d0f83abda..adac739b06 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -30,11 +30,11 @@ import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
 import org.apache.dolphinscheduler.registry.api.Event;
 import org.apache.dolphinscheduler.registry.api.Event.Type;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.ArrayUtils;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 8036c2d6c8..8255e9a3b0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.rpc;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.processor.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
 import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
@@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteStartProce
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
 import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
-import org.apache.dolphinscheduler.service.log.LoggerRequestProcessor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 8850c89e0a..0fbece94ac 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
@@ -57,7 +58,6 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index cc51b96d4d..524585794c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -49,11 +49,13 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ConnectorType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
@@ -75,8 +77,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -124,6 +124,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
 
     protected ProcessInstanceDao processInstanceDao;
 
+    protected StorageOperate storageOperate;
+
     protected MasterConfig masterConfig;
 
     protected TaskPluginManager taskPluginManager;
@@ -140,6 +142,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
         curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);
         taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
+        storageOperate = SpringApplicationContext.getBean(StorageOperate.class, null);
         this.taskInstance = taskInstance;
         this.processInstance = processInstance;
         this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@@ -395,7 +398,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
         udfFuncList.forEach(udfFunc -> {
             UdfFuncParameters udfFuncParameters =
                     JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncParameters.class);
-            udfFuncParameters.setDefaultFS(HadoopUtils.getInstance().getDefaultFS());
+            udfFuncParameters.setDefaultFS(PropertyUtils.getString(Constants.FS_DEFAULT_FS));
             String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
             udfFuncParameters.setTenantCode(tenantCode);
             map.put(udfFunc.getId(), udfFuncParameters);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index df39c9f461..b3caa4709b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
@@ -39,7 +40,6 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 import org.apache.dolphinscheduler.service.utils.ProcessUtils;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 30522b8e4a..e3ed6daa01 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -37,7 +38,6 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPoo
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 import org.apache.dolphinscheduler.service.utils.ProcessUtils;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
index e7dad575ec..cacad78c3b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
 import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml
index 9926eff21d..c3b342bffb 100644
--- a/dolphinscheduler-master/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml
@@ -30,8 +30,8 @@
     <conversionRule conversionWord="message"
                     converterClass="org.apache.dolphinscheduler.common.log.SensitiveDataConverter"/>
     <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
-        <filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
-        <Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
+        <filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/>
+        <Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator">
             <key>taskAppId</key>
             <logBase>${log.base}</logBase>
         </Discriminator>
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 658cd3ae58..cf894a0fcb 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -24,10 +24,10 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.Date;
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 73b6f5d285..1bb9e4dbeb 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@@ -41,7 +42,6 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPoo
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml
index e798920ceb..8ee7b8f6f8 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml
@@ -27,4 +27,11 @@
     </parent>
 
     <artifactId>dolphinscheduler-registry-api</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
similarity index 86%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
index 470c37a26c..2ef7d74c59 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
@@ -15,13 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.registry;
+package org.apache.dolphinscheduler.registry.api;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.dolphinscheduler.common.constants.Constants.COLON;
-import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH;
 
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.constants.Constants;
@@ -30,10 +26,6 @@ import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.registry.api.ConnectionListener;
-import org.apache.dolphinscheduler.registry.api.Registry;
-import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -126,7 +118,7 @@ public class RegistryClient {
             // todo: add host, port in heartBeat Info, so that we don't need to parse this again
             server.setZkDirectory(parentPath + "/" + serverPath);
             // set host and port
-            String[] hostAndPort = serverPath.split(COLON);
+            String[] hostAndPort = serverPath.split(Constants.COLON);
             // fetch the last one
             server.setHost(hostAndPort[0]);
             server.setPort(Integer.parseInt(hostAndPort[1]));
@@ -144,7 +136,7 @@ public class RegistryClient {
             String path = rootNodePath(nodeType);
             Collection<String> serverList = getServerNodes(nodeType);
             for (String server : serverList) {
-                serverMap.putIfAbsent(server, get(path + SINGLE_SLASH + server));
+                serverMap.putIfAbsent(server, get(path + Constants.SINGLE_SLASH + server));
             }
         } catch (Exception e) {
             logger.error("get server list failed", e);
@@ -160,7 +152,7 @@ public class RegistryClient {
     }
 
     public Collection<String> getMasterNodesDirectly() {
-        return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
+        return getChildrenKeys(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS);
     }
 
     /**
@@ -172,7 +164,7 @@ public class RegistryClient {
     public String getHostByEventDataPath(String path) {
         checkArgument(!Strings.isNullOrEmpty(path), "path cannot be null or empty");
 
-        final String[] pathArray = path.split(SINGLE_SLASH);
+        final String[] pathArray = path.split(Constants.SINGLE_SLASH);
 
         checkArgument(pathArray.length >= 1, "cannot parse path: %s", path);
 
@@ -224,11 +216,11 @@ public class RegistryClient {
     }
 
     public boolean isMasterPath(String path) {
-        return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS);
+        return path != null && path.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS);
     }
 
     public boolean isWorkerPath(String path) {
-        return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS);
+        return path != null && path.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS);
     }
 
     public Collection<String> getChildrenKeys(final String key) {
@@ -244,8 +236,8 @@ public class RegistryClient {
     }
 
     private void initNodes() {
-        registry.put(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
-        registry.put(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
+        registry.put(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
+        registry.put(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
     }
 
     private String rootNodePath(NodeType type) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
similarity index 93%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessor.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
index 26a9401e9c..e695fd4494 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessor.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.log;
+package org.apache.dolphinscheduler.remote.processor;
 
 import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
 import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
@@ -35,10 +35,6 @@ import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
 import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Constants;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -52,8 +48,6 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -71,13 +65,6 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);
 
-    private final ExecutorService executor;
-
-    public LoggerRequestProcessor() {
-        this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1,
-                new NamedThreadFactory("Log-Request-Process-Thread"));
-    }
-
     @Override
     public void process(Channel channel, Command command) {
         logger.info("received command : {}", command);
@@ -103,7 +90,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
                 if (!checkPathSecurity(viewLogPath)) {
                     throw new IllegalArgumentException("Illegal path: " + viewLogPath);
                 }
-                String msg = LoggerUtils.readWholeFileContent(viewLogPath);
+                String msg = LogUtils.readWholeFileContent(viewLogPath);
                 ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
                 channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
                 break;
@@ -199,10 +186,6 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
         }
     }
 
-    public ExecutorService getExecutor() {
-        return this.executor;
-    }
-
     /**
      * get files content bytes for download file
      *
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
index 21f0e8b793..18995ecdf0 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-service/pom.xml
@@ -74,207 +74,9 @@
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>curator-client</artifactId>
-                </exclusion>
-
-                <exclusion>
-                    <groupId>commons-configuration</groupId>
-                    <artifactId>commons-configuration</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>io.grpc</groupId>
-                    <artifactId>grpc-protobuf</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-core-asl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-mapper-asl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>jackson-mapper-asl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.code.gson</groupId>
-                    <artifactId>gson</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>xmlenc</groupId>
-                    <artifactId>xmlenc</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-net</groupId>
-                    <artifactId>commons-net</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet.jsp</groupId>
-                    <artifactId>jsp-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-json</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-server</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.mortbay.jetty</groupId>
-                    <artifactId>jetty</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>org.apache.hbase.thirdparty</groupId>
             <artifactId>hbase-noop-htrace</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-jaxrs</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-xc</artifactId>
-                </exclusion>
-
-                <exclusion>
-                    <groupId>org.fusesource.leveldbjni</groupId>
-                    <artifactId>leveldbjni-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-client</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.xml.bind</groupId>
-                    <artifactId>jaxb-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>xmlenc</groupId>
-                    <artifactId>xmlenc</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.fusesource.leveldbjni</groupId>
-                    <artifactId>leveldbjni-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-server</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>com.amazonaws</groupId>
-            <artifactId>aws-java-sdk-s3</artifactId>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
index 50d0f835f7..14d82b416e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.service.bean;
 
 import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.support.AbstractApplicationContext;
@@ -44,4 +45,12 @@ public class SpringApplicationContext implements ApplicationContextAware, AutoCl
     public static <T> T getBean(Class<T> requiredType) {
         return applicationContext.getBean(requiredType);
     }
+
+    public static <T> T getBean(Class<T> requiredType, T defaultValue) {
+        try {
+            return applicationContext.getBean(requiredType);
+        } catch (NoSuchBeanDefinitionException e) {
+            return defaultValue;
+        }
+    }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
index ffa9299ffc..8063946b4c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
@@ -19,13 +19,13 @@ package org.apache.dolphinscheduler.service.cache.impl;
 
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections4.CollectionUtils;
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index ccf2f88c89..cefa059f83 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -116,6 +116,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
@@ -137,7 +138,6 @@ import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.model.TaskNode;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.service.utils.ClusterConfUtils;
 import org.apache.dolphinscheduler.service.utils.DagHelper;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StoreConfiguration.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StoreConfiguration.java
deleted file mode 100644
index a219764d2e..0000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StoreConfiguration.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.storage;
-
-import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE;
-import static org.apache.dolphinscheduler.common.constants.Constants.STORAGE_HDFS;
-import static org.apache.dolphinscheduler.common.constants.Constants.STORAGE_OSS;
-import static org.apache.dolphinscheduler.common.constants.Constants.STORAGE_S3;
-
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils;
-import org.apache.dolphinscheduler.service.storage.impl.OssOperator;
-import org.apache.dolphinscheduler.service.storage.impl.S3Utils;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.stereotype.Component;
-
-/**
- * choose the impl of storage by RESOURCE_STORAGE_TYPE
- */
-
-@Component
-@Configuration
-public class StoreConfiguration {
-
-    @Bean
-    public StorageOperate storageOperate() {
-        switch (PropertyUtils.getUpperCaseString(RESOURCE_STORAGE_TYPE)) {
-            case STORAGE_OSS:
-                OssOperator ossOperator = new OssOperator();
-                // TODO: change to use ossOperator.init(ossConnection) after DS supports Configuration / Connection
-                // Center
-                ossOperator.init();
-                return ossOperator;
-            case STORAGE_S3:
-                return S3Utils.getInstance();
-            case STORAGE_HDFS:
-                return HadoopUtils.getInstance();
-            default:
-                return null;
-        }
-    }
-
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java
index 1345e71867..3781716685 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java
@@ -19,15 +19,11 @@ package org.apache.dolphinscheduler.service.utils;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 
-import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 
@@ -66,79 +62,6 @@ public class CommonUtils {
         return envPath;
     }
 
-    /**
-     * @return is develop mode
-     */
-    public static boolean isDevelopMode() {
-        return PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE, true);
-    }
-
-    /**
-     * if upload resource is HDFS and kerberos startup is true , else false
-     *
-     * @return true if upload resource is HDFS and kerberos startup
-     */
-    public static boolean getKerberosStartupState() {
-        String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
-        ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
-        Boolean kerberosStartupState =
-                PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
-        return resUploadType == ResUploadType.HDFS && kerberosStartupState;
-    }
-
-    /**
-     * load kerberos configuration
-     *
-     * @param configuration
-     * @return load kerberos config return true
-     * @throws IOException errors
-     */
-    public static boolean loadKerberosConf(Configuration configuration) throws IOException {
-        return loadKerberosConf(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH),
-                PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
-                PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH), configuration);
-    }
-
-    /**
-     * load kerberos configuration
-     *
-     * @param javaSecurityKrb5Conf javaSecurityKrb5Conf
-     * @param loginUserKeytabUsername loginUserKeytabUsername
-     * @param loginUserKeytabPath loginUserKeytabPath
-     * @throws IOException errors
-     */
-    public static void loadKerberosConf(String javaSecurityKrb5Conf, String loginUserKeytabUsername,
-                                        String loginUserKeytabPath) throws IOException {
-        loadKerberosConf(javaSecurityKrb5Conf, loginUserKeytabUsername, loginUserKeytabPath, new Configuration());
-    }
-
-    /**
-     * load kerberos configuration
-     *
-     * @param javaSecurityKrb5Conf javaSecurityKrb5Conf
-     * @param loginUserKeytabUsername loginUserKeytabUsername
-     * @param loginUserKeytabPath loginUserKeytabPath
-     * @param configuration configuration
-     * @return load kerberos config return true
-     * @throws IOException errors
-     */
-    public static boolean loadKerberosConf(String javaSecurityKrb5Conf, String loginUserKeytabUsername,
-                                           String loginUserKeytabPath, Configuration configuration) throws IOException {
-        if (CommonUtils.getKerberosStartupState()) {
-            System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, StringUtils.defaultIfBlank(javaSecurityKrb5Conf,
-                    PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)));
-            configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS);
-            UserGroupInformation.setConfiguration(configuration);
-            UserGroupInformation.loginUserFromKeytab(
-                    StringUtils.defaultIfBlank(loginUserKeytabUsername,
-                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)),
-                    StringUtils.defaultIfBlank(loginUserKeytabPath,
-                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH)));
-            return true;
-        }
-        return false;
-    }
-
     /**
      * encode password
      */
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java
index 1db727b591..7f7188fb2c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.constants.DateConstants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.service.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java
index 1d68dfbb53..af96dc97fa 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java
@@ -35,18 +35,14 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 /**
- * logger utils
+ * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}.
  */
+@Deprecated
 @UtilityClass
 public class LoggerUtils {
 
     private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class);
 
-    /**
-     * build job id
-     *
-     * @return task id format
-     */
     public static String buildTaskId(Date firstSubmitTime,
                                      Long processDefineCode,
                                      int processDefineVersion,
@@ -79,28 +75,46 @@ public class LoggerUtils {
         return "";
     }
 
+    /**
+     * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}
+     */
     public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) {
         setWorkflowInstanceIdMDC(workflowInstanceId);
         setTaskInstanceIdMDC(taskInstanceId);
     }
 
+    /**
+     * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}
+     */
     public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) {
         MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
     }
 
+    /**
+     * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}
+     */
     public static void setTaskInstanceIdMDC(Integer taskInstanceId) {
         MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
     }
 
+    /**
+     * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}
+     */
     public static void removeWorkflowAndTaskInstanceIdMDC() {
         removeWorkflowInstanceIdMDC();
         removeTaskInstanceIdMDC();
     }
 
+    /**
+     * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}
+     */
     public static void removeWorkflowInstanceIdMDC() {
         MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY);
     }
 
+    /**
+     * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}
+     */
     public static void removeTaskInstanceIdMDC() {
         MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY);
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
index 9d45e3651c..52d4fd6148 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
@@ -22,10 +22,8 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.log.LogClient;
-import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -64,34 +62,6 @@ public class ProcessUtils {
      */
     private static final Pattern WINDOWSATTERN = Pattern.compile("\\w+\\((\\d+)\\)");
 
-    /**
-     * kill yarn application.
-     *
-     * @param appIds app id list
-     * @param logger logger
-     * @param tenantCode tenant code
-     * @param executePath execute path
-     */
-    public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) {
-        if (appIds == null || appIds.isEmpty()) {
-            return;
-        }
-
-        for (String appId : appIds) {
-            try {
-                TaskExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
-
-                if (!applicationStatus.isFinished()) {
-                    String commandFile = String.format("%s/%s.kill", executePath, appId);
-                    String cmd = getKerberosInitCommand() + "yarn application -kill " + appId;
-                    execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd);
-                }
-            } catch (Exception e) {
-                logger.error("Get yarn application app id [{}}] status failed", appId, e);
-            }
-        }
-    }
-
     /**
      * get kerberos init command
      */
@@ -214,7 +184,8 @@ public class ProcessUtils {
                                     taskExecutionContext.getTaskInstanceId()));
                 }
                 FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath());
-                cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(),
+                org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(appIds, logger,
+                        taskExecutionContext.getTenantCode(),
                         taskExecutionContext.getExecutePath());
                 return appIds;
             } else {
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
index 595e3cc071..e59b7e55c9 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
@@ -21,13 +21,13 @@ import org.apache.dolphinscheduler.common.enums.CacheType;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.service.cache.impl.CacheNotifyServiceImpl;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java
index 8bfd721709..167f0a54d9 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
+import org.apache.dolphinscheduler.remote.processor.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.junit.jupiter.api.AfterEach;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/MasterLogFilterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/MasterLogFilterTest.java
deleted file mode 100644
index 5238565088..0000000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/MasterLogFilterTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.log;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.spi.LoggingEvent;
-import ch.qos.logback.core.spi.FilterReply;
-
-public class MasterLogFilterTest {
-
-    @Test
-    public void decide() {
-        MasterLogFilter masterLogFilter = new MasterLogFilter();
-
-        FilterReply filterReply = masterLogFilter.decide(new LoggingEvent() {
-
-            @Override
-            public String getThreadName() {
-                return Constants.THREAD_NAME_MASTER_SERVER;
-            }
-
-            @Override
-            public Level getLevel() {
-                return Level.INFO;
-            }
-
-            @Override
-            public String getMessage() {
-                return "master insert into queue success, task : shell2";
-            }
-
-        });
-
-        Assertions.assertEquals(FilterReply.ACCEPT, filterReply);
-
-    }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java
index e06041d605..d8a79c8465 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.service.log;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java
index 9c1c9f80c9..0c9c43fd10 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.service.log;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/WorkerLogFilterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/WorkerLogFilterTest.java
deleted file mode 100644
index 323184a442..0000000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/WorkerLogFilterTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.log;
-
-import org.apache.dolphinscheduler.common.constants.Constants;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.spi.LoggingEvent;
-import ch.qos.logback.core.spi.FilterReply;
-
-public class WorkerLogFilterTest {
-
-    @Test
-    public void decide() {
-        WorkerLogFilter workerLogFilter = new WorkerLogFilter();
-
-        FilterReply filterReply = workerLogFilter.decide(new LoggingEvent() {
-
-            @Override
-            public String getThreadName() {
-                return Constants.THREAD_NAME_WORKER_SERVER;
-            }
-
-            @Override
-            public Level getLevel() {
-                return Level.INFO;
-            }
-
-            @Override
-            public String getMessage() {
-                return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed";
-            }
-
-            @Override
-            public Object[] getArgumentArray() {
-                return new Object[0];
-            }
-
-            @Override
-            public String getFormattedMessage() {
-                return "consume tasks: [2_177_2_704_-1],there still have 0 tasks need to be executed";
-            }
-
-        });
-
-        Assertions.assertEquals(FilterReply.ACCEPT, filterReply);
-
-    }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index ac83839d50..7900a63186 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -70,6 +70,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
@@ -82,7 +83,6 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.model.TaskNode;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.params.base.FormType;
 
 import java.util.ArrayList;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java
index a53b3e0699..cab280a702 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/CommonUtilsTest.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.service.utils;
 
 import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -45,18 +44,6 @@ public class CommonUtilsTest {
         Assertions.assertEquals("/etc/profile", envPath);
     }
 
-    @Test
-    public void isDevelopMode() {
-        logger.info("develop mode: {}", CommonUtils.isDevelopMode());
-        Assertions.assertTrue(true);
-    }
-
-    @Test
-    public void getHdfsDataBasePath() {
-        logger.info(HadoopUtils.getHdfsDataBasePath());
-        Assertions.assertTrue(true);
-    }
-
     @Test
     public void getDownloadFilename() {
         logger.info(FileUtils.getDownloadFilename("a.txt"));
@@ -69,12 +56,6 @@ public class CommonUtilsTest {
         Assertions.assertTrue(true);
     }
 
-    @Test
-    public void getHdfsDir() {
-        logger.info(HadoopUtils.getHdfsResDir("1234"));
-        Assertions.assertTrue(true);
-    }
-
     @Test
     public void test() {
         InetAddress ip;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java
index bfb8a523b0..f03801091d 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.service.utils;
 import org.apache.dolphinscheduler.common.constants.DateConstants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.service.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ProcessUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ProcessUtilsTest.java
index 22cf4ea55e..5fc7d0d69c 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ProcessUtilsTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ProcessUtilsTest.java
@@ -22,11 +22,6 @@ import static org.mockito.ArgumentMatchers.anyString;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils;
-
-import java.util.ArrayList;
-import java.util.List;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -80,27 +75,4 @@ public class ProcessUtilsTest {
         }
     }
 
-    @Test
-    public void testCancelApplication() {
-        List<String> appIds = new ArrayList<>();
-        appIds.add("application_1585532379175_228491");
-        appIds.add("application_1598885606600_3677");
-        String tenantCode = "dev";
-        String executePath = "/ds-exec/1/1/1";
-        TaskExecutionStatus running = TaskExecutionStatus.RUNNING_EXECUTION;
-
-        try (MockedStatic<HadoopUtils> mockedStaticHadoopUtils = Mockito.mockStatic(HadoopUtils.class)) {
-            HadoopUtils hadoop = HadoopUtils.getInstance();
-
-            try {
-                Mockito.when(hadoop.getApplicationStatus("application_1585532379175_228491")).thenReturn(running);
-                Mockito.when(hadoop.getApplicationStatus("application_1598885606600_3677")).thenReturn(running);
-            } catch (Exception e) {
-                e.printStackTrace();
-                ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
-            }
-
-            Assertions.assertNotNull(appIds);
-        }
-    }
 }
diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
index ee7c5414f4..4191bc47ab 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
@@ -50,8 +50,8 @@
     <conversionRule conversionWord="message"
                     converterClass="org.apache.dolphinscheduler.common.log.SensitiveDataConverter"/>
     <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
-        <filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
-        <Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
+        <filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/>
+        <Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator">
             <key>taskAppId</key>
             <logBase>${log.base}</logBase>
         </Discriminator>
diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
new file mode 100644
index 0000000000..88f09cfd26
--- /dev/null
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-all/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-storage-plugin</artifactId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dolphinscheduler-storage-all</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-s3</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-hdfs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-oss</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/pom.xml
new file mode 100644
index 0000000000..5e5311dadc
--- /dev/null
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-storage-plugin</artifactId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dolphinscheduler-storage-api</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageConfiguration.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageConfiguration.java
new file mode 100644
index 0000000000..d34e6d01de
--- /dev/null
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.storage.api;
+
+import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE;
+
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+
+import java.util.Optional;
+import java.util.ServiceLoader;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+// todo: If we move the config to yaml
+@Configuration
+public class StorageConfiguration {
+
+    @Bean
+    public StorageOperate storageOperate() {
+        Optional<StorageType> storageTypeOptional =
+                StorageType.getStorageType(PropertyUtils.getUpperCaseString(RESOURCE_STORAGE_TYPE));
+        Optional<StorageOperate> storageOperate = storageTypeOptional.map(storageType -> {
+            ServiceLoader<StorageOperateFactory> storageOperateFactories =
+                    ServiceLoader.load(StorageOperateFactory.class);
+            for (StorageOperateFactory storageOperateFactory : storageOperateFactories) {
+                if (storageOperateFactory.getStorageOperate() == storageType) {
+                    return storageOperateFactory.createStorageOperate();
+                }
+            }
+            return null;
+        });
+        return storageOperate.orElse(null);
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StorageEntity.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageEntity.java
similarity index 97%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StorageEntity.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageEntity.java
index e8ae58df48..2474fbad18 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StorageEntity.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageEntity.java
@@ -18,7 +18,7 @@
  *
  */
 
-package org.apache.dolphinscheduler.service.storage;
+package org.apache.dolphinscheduler.plugin.storage.api;
 
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StorageOperate.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java
similarity index 90%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StorageOperate.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java
index e194811291..ef22ea4886 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/StorageOperate.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.storage;
+package org.apache.dolphinscheduler.plugin.storage.api;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.ResUploadType;
@@ -177,18 +177,18 @@ public interface StorageOperate {
     /**
      * return files and folders in the current directory and subdirectories
      * */
-    public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode,
-                                                          ResourceType type);
+    List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode,
+                                                   ResourceType type);
 
     /**
     * return files and folders in the current directory
     * */
-    public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode,
-                                               ResourceType type) throws Exception;
+    List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode,
+                                        ResourceType type) throws Exception;
 
     /**
      * return a file status
      * */
-    public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode,
-                                       ResourceType type) throws Exception;
+    StorageEntity getFileStatus(String path, String defaultPath, String tenantCode,
+                                ResourceType type) throws Exception;
 }
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperateFactory.java
similarity index 63%
copy from dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
copy to dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperateFactory.java
index 33a4d8c21d..b3a60888c9 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperateFactory.java
@@ -15,23 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.worker.processor;
+package org.apache.dolphinscheduler.plugin.storage.api;
 
-import org.apache.dolphinscheduler.service.process.ProcessService;
+public interface StorageOperateFactory {
 
-import org.mockito.Mockito;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * dependency config
- */
-@Configuration
-public class TaskCallbackServiceTestConfig {
-
-    @Bean
-    public ProcessService processService() {
-        return Mockito.mock(ProcessService.class);
-    }
+    StorageOperate createStorageOperate();
 
+    StorageType getStorageOperate();
 }
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
similarity index 52%
rename from dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
index 33a4d8c21d..f2cfc62e10 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageType.java
@@ -15,23 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.worker.processor;
+package org.apache.dolphinscheduler.plugin.storage.api;
 
-import org.apache.dolphinscheduler.service.process.ProcessService;
+import java.util.Optional;
 
-import org.mockito.Mockito;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
+public enum StorageType {
 
-/**
- * dependency config
- */
-@Configuration
-public class TaskCallbackServiceTestConfig {
+    HDFS(0, "HDFS"),
+    OSS(1, "OSS"),
+    S3(2, "S3"),
+    ;
+
+    private final int code;
+    private final String name;
+
+    StorageType(int code, String name) {
+        this.code = code;
+        this.name = name;
+    }
 
-    @Bean
-    public ProcessService processService() {
-        return Mockito.mock(ProcessService.class);
+    public int getCode() {
+        return code;
     }
 
+    public String getName() {
+        return name;
+    }
+
+    public static Optional<StorageType> getStorageType(String name) {
+        for (StorageType storageType : StorageType.values()) {
+            if (storageType.getName().equals(name)) {
+                return Optional.of(storageType);
+            }
+        }
+        return Optional.empty();
+    }
 }
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/pom.xml
similarity index 83%
copy from dolphinscheduler-service/pom.xml
copy to dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/pom.xml
index 21f0e8b793..2a07584948 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/pom.xml
@@ -20,60 +20,22 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.dolphinscheduler</groupId>
-        <artifactId>dolphinscheduler</artifactId>
+        <artifactId>dolphinscheduler-storage-plugin</artifactId>
         <version>dev-SNAPSHOT</version>
     </parent>
 
-    <artifactId>dolphinscheduler-service</artifactId>
-
-    <name>dolphinscheduler-service</name>
-
-    <dependencyManagement>
-        <dependencies>
-            <dependency>
-                <groupId>org.apache.dolphinscheduler</groupId>
-                <artifactId>dolphinscheduler-bom</artifactId>
-                <version>${project.version}</version>
-                <type>pom</type>
-                <scope>import</scope>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
+    <artifactId>dolphinscheduler-storage-hdfs</artifactId>
 
     <dependencies>
-        <!-- dolphinscheduler -->
+        <!--  This is used to load Kerberos Conf -->
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-remote</artifactId>
+            <artifactId>dolphinscheduler-datasource-api</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-dao</artifactId>
+            <artifactId>dolphinscheduler-storage-api</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-spi</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-registry-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-task-api</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.cronutils</groupId>
-            <artifactId>cron-utils</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>io.micrometer</groupId>
-            <artifactId>micrometer-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
@@ -170,10 +132,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.hbase.thirdparty</groupId>
-            <artifactId>hbase-noop-htrace</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
@@ -270,11 +228,5 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>com.amazonaws</groupId>
-            <artifactId>aws-java-sdk-s3</artifactId>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtils.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java
similarity index 89%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtils.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java
index b0e0614a42..db32b1b69f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtils.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.storage.impl;
+package org.apache.dolphinscheduler.plugin.storage.hdfs;
 
 import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
 import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
@@ -29,10 +29,9 @@ import org.apache.dolphinscheduler.common.utils.HttpUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.utils.CommonUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.io.IOUtils;
@@ -75,14 +74,9 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
-/**
- * hadoop utils
- * single instance
- * By default, directory path does NOT end with '/'
- */
-public class HadoopUtils implements Closeable, StorageOperate {
+public class HdfsStorageOperator implements Closeable, StorageOperate {
 
-    private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
+    private static final Logger logger = LoggerFactory.getLogger(HdfsStorageOperator.class);
     private String hdfsUser;
     public static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
     public static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
@@ -91,14 +85,14 @@ public class HadoopUtils implements Closeable, StorageOperate {
             PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
     private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
 
-    private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
+    private static final LoadingCache<String, HdfsStorageOperator> cache = CacheBuilder
             .newBuilder()
             .expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 2), TimeUnit.HOURS)
-            .build(new CacheLoader<String, HadoopUtils>() {
+            .build(new CacheLoader<String, HdfsStorageOperator>() {
 
                 @Override
-                public HadoopUtils load(String key) throws Exception {
-                    return new HadoopUtils();
+                public HdfsStorageOperator load(String key) throws Exception {
+                    return new HdfsStorageOperator();
                 }
             });
 
@@ -107,13 +101,13 @@ public class HadoopUtils implements Closeable, StorageOperate {
     private Configuration configuration;
     private FileSystem fs;
 
-    private HadoopUtils() {
+    private HdfsStorageOperator() {
         hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
         init();
         initHdfsPath();
     }
 
-    public static HadoopUtils getInstance() {
+    public static HdfsStorageOperator getInstance() {
         return cache.getUnchecked(HADOOP_UTILS_KEY);
     }
 
@@ -601,75 +595,6 @@ public class HadoopUtils implements Closeable, StorageOperate {
         return yarnEnabled;
     }
 
-    /**
-     * get the state of an application
-     *
-     * @param applicationId application id
-     * @return the return may be null or there may be other parse exceptions
-     */
-    public TaskExecutionStatus getApplicationStatus(String applicationId) throws BaseException {
-        if (StringUtils.isEmpty(applicationId)) {
-            return null;
-        }
-
-        String result;
-        String applicationUrl = getApplicationUrl(applicationId);
-        logger.debug("generate yarn application url, applicationUrl={}", applicationUrl);
-
-        String responseContent = Boolean.TRUE
-                .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
-                        ? KerberosHttpClient.get(applicationUrl)
-                        : HttpUtils.get(applicationUrl);
-        if (responseContent != null) {
-            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
-            if (!jsonObject.has("app")) {
-                return TaskExecutionStatus.FAILURE;
-            }
-            result = jsonObject.path("app").path("finalStatus").asText();
-
-        } else {
-            // may be in job history
-            String jobHistoryUrl = getJobHistoryUrl(applicationId);
-            logger.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl);
-            responseContent = Boolean.TRUE
-                    .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
-                            ? KerberosHttpClient.get(jobHistoryUrl)
-                            : HttpUtils.get(jobHistoryUrl);
-
-            if (null != responseContent) {
-                ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
-                if (!jsonObject.has("job")) {
-                    return TaskExecutionStatus.FAILURE;
-                }
-                result = jsonObject.path("job").path("state").asText();
-            } else {
-                return TaskExecutionStatus.FAILURE;
-            }
-        }
-
-        return getExecutionStatus(result);
-    }
-
-    private TaskExecutionStatus getExecutionStatus(String result) {
-        switch (result) {
-            case Constants.ACCEPTED:
-                return TaskExecutionStatus.SUBMITTED_SUCCESS;
-            case Constants.SUCCEEDED:
-            case Constants.ENDED:
-                return TaskExecutionStatus.SUCCESS;
-            case Constants.NEW:
-            case Constants.NEW_SAVING:
-            case Constants.SUBMITTED:
-            case Constants.FAILED:
-                return TaskExecutionStatus.FAILURE;
-            case Constants.KILLED:
-                return TaskExecutionStatus.KILL;
-            case Constants.RUNNING:
-            default:
-                return TaskExecutionStatus.RUNNING_EXECUTION;
-        }
-    }
-
     /**
      * get data hdfs path
      *
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/WorkerLogFilter.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java
similarity index 52%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/WorkerLogFilter.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java
index a1f4733a72..169a645f2d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/WorkerLogFilter.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java
@@ -14,37 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.service.log;
 
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.filter.Filter;
-import ch.qos.logback.core.spi.FilterReply;
+package org.apache.dolphinscheduler.plugin.storage.hdfs;
 
-/**
- *  worker log filter
- */
-public class WorkerLogFilter extends Filter<ILoggingEvent> {
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageType;
 
-    /**
-     * level
-     */
-    Level level;
+import com.google.auto.service.AutoService;
 
-    /**
-     * Accept or reject based on thread name
-     * @param event event
-     * @return FilterReply
-     */
-    @Override
-    public FilterReply decide(ILoggingEvent event) {
-        if (event.getThreadName().startsWith("Worker-")) {
-            return FilterReply.ACCEPT;
-        }
+@AutoService(StorageOperateFactory.class)
+public class HdfsStorageOperatorFactory implements StorageOperateFactory {
 
-        return FilterReply.DENY;
+    @Override
+    public StorageOperate createStorageOperate() {
+        return HdfsStorageOperator.getInstance();
     }
-    public void setLevel(String level) {
-        this.level = Level.toLevel(level);
+
+    @Override
+    public StorageType getStorageOperate() {
+        return StorageType.HDFS;
     }
 }
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtilsTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/test/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorTest.java
similarity index 78%
rename from dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtilsTest.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/test/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorTest.java
index a7398d7953..4194cbbf60 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtilsTest.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/test/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.storage.impl;
+package org.apache.dolphinscheduler.plugin.storage.hdfs;
 
 import org.apache.dolphinscheduler.common.utils.HttpUtils;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -33,31 +33,31 @@ import org.slf4j.LoggerFactory;
  * hadoop utils test
  */
 @ExtendWith(MockitoExtension.class)
-public class HadoopUtilsTest {
+public class HdfsStorageOperatorTest {
 
-    private static final Logger logger = LoggerFactory.getLogger(HadoopUtilsTest.class);
+    private static final Logger logger = LoggerFactory.getLogger(HdfsStorageOperatorTest.class);
 
     @Test
     public void getHdfsTenantDir() {
-        logger.info(HadoopUtils.getHdfsTenantDir("1234"));
+        logger.info(HdfsStorageOperator.getHdfsTenantDir("1234"));
         Assertions.assertTrue(true);
     }
 
     @Test
     public void getHdfsUdfFileName() {
-        logger.info(HadoopUtils.getHdfsUdfFileName("admin", "file_name"));
+        logger.info(HdfsStorageOperator.getHdfsUdfFileName("admin", "file_name"));
         Assertions.assertTrue(true);
     }
 
     @Test
     public void getHdfsResourceFileName() {
-        logger.info(HadoopUtils.getHdfsResourceFileName("admin", "file_name"));
+        logger.info(HdfsStorageOperator.getHdfsResourceFileName("admin", "file_name"));
         Assertions.assertTrue(true);
     }
 
     @Test
     public void getHdfsFileName() {
-        logger.info(HadoopUtils.getHdfsFileName(ResourceType.FILE, "admin", "file_name"));
+        logger.info(HdfsStorageOperator.getHdfsFileName(ResourceType.FILE, "admin", "file_name"));
         Assertions.assertTrue(true);
     }
 
@@ -66,7 +66,7 @@ public class HadoopUtilsTest {
         try (MockedStatic<HttpUtils> mockedHttpUtils = Mockito.mockStatic(HttpUtils.class)) {
             mockedHttpUtils.when(() -> HttpUtils.get("http://ds1:8088/ws/v1/cluster/info"))
                     .thenReturn("{\"clusterInfo\":{\"state\":\"STARTED\",\"haState\":\"ACTIVE\"}}");
-            logger.info(HadoopUtils.getAppAddress("http://ds1:8088/ws/v1/cluster/apps/%s", "ds1,ds2"));
+            logger.info(HdfsStorageOperator.getAppAddress("http://ds1:8088/ws/v1/cluster/apps/%s", "ds1,ds2"));
             Assertions.assertTrue(true);
         }
     }
diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/pom.xml
new file mode 100644
index 0000000000..90fa9cb5e0
--- /dev/null
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-storage-plugin</artifactId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dolphinscheduler-storage-oss</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/OssOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java
similarity index 97%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/OssOperator.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java
index d076112927..f2d2f256cc 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/OssOperator.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.storage.impl;
+package org.apache.dolphinscheduler.plugin.storage.oss;
 
 import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
 import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
@@ -27,9 +27,9 @@ import org.apache.dolphinscheduler.common.enums.ResUploadType;
 import org.apache.dolphinscheduler.common.factory.OssClientFactory;
 import org.apache.dolphinscheduler.common.model.OssConnection;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.lang3.StringUtils;
@@ -62,9 +62,9 @@ import com.aliyun.oss.model.ObjectMetadata;
 import com.aliyun.oss.model.PutObjectRequest;
 
 @Data
-public class OssOperator implements Closeable, StorageOperate {
+public class OssStorageOperator implements Closeable, StorageOperate {
 
-    private static final Logger logger = LoggerFactory.getLogger(OssOperator.class);
+    private static final Logger logger = LoggerFactory.getLogger(OssStorageOperator.class);
 
     private String accessKeyId;
 
@@ -80,7 +80,7 @@ public class OssOperator implements Closeable, StorageOperate {
 
     private OSS ossClient;
 
-    public OssOperator() {
+    public OssStorageOperator() {
     }
 
     public void init() {
@@ -360,5 +360,4 @@ public class OssOperator implements Closeable, StorageOperate {
     protected OSS buildOssClient() {
         return OssClientFactory.buildOssClient(ossConnection);
     }
-
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/MasterLogFilter.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorFactory.java
similarity index 52%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/MasterLogFilter.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorFactory.java
index a9d352de11..783faf59ea 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/MasterLogFilter.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorFactory.java
@@ -14,37 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.service.log;
 
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.filter.Filter;
-import ch.qos.logback.core.spi.FilterReply;
+package org.apache.dolphinscheduler.plugin.storage.oss;
 
-/**
- * master log filter
- */
-public class MasterLogFilter extends Filter<ILoggingEvent> {
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageType;
+
+import com.google.auto.service.AutoService;
 
-    /**
-     * log level
-     */
-    Level level;
+@AutoService(StorageOperateFactory.class)
+public class OssStorageOperatorFactory implements StorageOperateFactory {
 
-    /**
-     * Accept or reject based on thread name
-     * @param event event
-     * @return FilterReply
-     */
     @Override
-    public FilterReply decide(ILoggingEvent event) {
-        if (event.getThreadName().startsWith("Master-")) {
-            return FilterReply.ACCEPT;
-        }
-        return FilterReply.DENY;
+    public StorageOperate createStorageOperate() {
+        OssStorageOperator ossOperator = new OssStorageOperator();
+        ossOperator.init();
+        return ossOperator;
     }
 
-    public void setLevel(String level) {
-        this.level = Level.toLevel(level);
+    @Override
+    public StorageType getStorageOperate() {
+        return StorageType.OSS;
     }
 }
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/storage/impl/OssOperatorTest.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java
similarity index 98%
rename from dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/storage/impl/OssOperatorTest.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java
index 4d35807d6e..eb59c5f667 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/storage/impl/OssOperatorTest.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.storage.impl;
+package org.apache.dolphinscheduler.plugin.storage.oss;
 
 import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
 import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
@@ -41,7 +41,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import com.aliyun.oss.OSS;
 
 @ExtendWith(MockitoExtension.class)
-public class OssOperatorTest {
+public class OssStorageOperatorTest {
 
     private static final String ACCESS_KEY_ID_MOCK = "ACCESS_KEY_ID_MOCK";
     private static final String ACCESS_KEY_SECRET_MOCK = "ACCESS_KEY_SECRET_MOCK";
@@ -56,11 +56,11 @@ public class OssOperatorTest {
     @Mock
     private OSS ossClientMock;
 
-    private OssOperator ossOperator;
+    private OssStorageOperator ossOperator;
 
     @BeforeEach
     public void setUp() throws Exception {
-        ossOperator = spy(new OssOperator());
+        ossOperator = spy(new OssStorageOperator());
         doReturn(ACCESS_KEY_ID_MOCK).when(ossOperator)
                 .readOssAccessKeyID();
         doReturn(ACCESS_KEY_SECRET_MOCK).when(ossOperator)
diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/pom.xml b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/pom.xml
new file mode 100644
index 0000000000..b9dfd5d1c9
--- /dev/null
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler-storage-plugin</artifactId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dolphinscheduler-storage-s3</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/S3Utils.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
similarity index 88%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/S3Utils.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
index f8ffafae89..81c0d2f7e6 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/S3Utils.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
@@ -15,22 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.storage.impl;
+package org.apache.dolphinscheduler.plugin.storage.s3;
 
 import static org.apache.dolphinscheduler.common.constants.Constants.AWS_END_POINT;
 import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
 import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
-import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE;
 import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE;
 import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF;
-import static org.apache.dolphinscheduler.common.constants.Constants.STORAGE_S3;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.ResUploadType;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.service.storage.StorageEntity;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.lang3.StringUtils;
@@ -78,67 +76,40 @@ import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
 import com.google.common.base.Joiner;
 
-/**
- * By default, directory path does end with '/'
- */
-public class S3Utils implements Closeable, StorageOperate {
-
-    private static final Logger logger = LoggerFactory.getLogger(S3Utils.class);
+public class S3StorageOperator implements Closeable, StorageOperate {
 
-    public static final String ACCESS_KEY_ID = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+    private static final Logger logger = LoggerFactory.getLogger(S3StorageOperator.class);
 
-    public static final String SECRET_KEY_ID = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+    // todo: move to s3
+    private static final String ACCESS_KEY_ID = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
 
-    public static final String REGION = PropertyUtils.getString(TaskConstants.AWS_REGION);
+    private static final String SECRET_KEY_ID = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
 
-    public static final String BUCKET_NAME = PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
+    private static final String REGION = PropertyUtils.getString(TaskConstants.AWS_REGION);
 
-    private AmazonS3 s3Client = null;
+    private static final String BUCKET_NAME = PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
 
-    private S3Utils() {
-        if (PropertyUtils.getString(RESOURCE_STORAGE_TYPE).equals(STORAGE_S3)) {
+    private final AmazonS3 s3Client;
 
-            if (!StringUtils.isEmpty(PropertyUtils.getString(AWS_END_POINT))) {
-                s3Client = AmazonS3ClientBuilder
-                        .standard()
-                        .withPathStyleAccessEnabled(true)
-                        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
-                                PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName()))
-                        .withCredentials(
-                                new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
-                        .build();
-            } else {
-                s3Client = AmazonS3ClientBuilder
-                        .standard()
-                        .withCredentials(
-                                new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
-                        .withRegion(Regions.fromName(REGION))
-                        .build();
-            }
-            checkBucketNameExists(BUCKET_NAME);
-        }
-    }
-
-    /**
-     * S3Utils single
-     */
-    private enum S3Singleton {
-
-        INSTANCE;
-
-        private final S3Utils instance;
-
-        S3Singleton() {
-            instance = new S3Utils();
-        }
-
-        private S3Utils getInstance() {
-            return instance;
+    public S3StorageOperator() {
+        if (!StringUtils.isEmpty(PropertyUtils.getString(AWS_END_POINT))) {
+            s3Client = AmazonS3ClientBuilder
+                    .standard()
+                    .withPathStyleAccessEnabled(true)
+                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+                            PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName()))
+                    .withCredentials(
+                            new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
+                    .build();
+        } else {
+            s3Client = AmazonS3ClientBuilder
+                    .standard()
+                    .withCredentials(
+                            new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
+                    .withRegion(Regions.fromName(REGION))
+                    .build();
         }
-    }
-
-    public static S3Utils getInstance() {
-        return S3Singleton.INSTANCE.getInstance();
+        checkBucketNameExists(BUCKET_NAME);
     }
 
     @Override
@@ -148,8 +119,8 @@ public class S3Utils implements Closeable, StorageOperate {
 
     @Override
     public void createTenantDirIfNotExists(String tenantCode) throws Exception {
-        getInstance().mkdir(tenantCode, getS3ResDir(tenantCode));
-        getInstance().mkdir(tenantCode, getS3UdfDir(tenantCode));
+        mkdir(tenantCode, getS3ResDir(tenantCode));
+        mkdir(tenantCode, getS3UdfDir(tenantCode));
     }
 
     @Override
@@ -371,7 +342,7 @@ public class S3Utils implements Closeable, StorageOperate {
      * upload local directory to S3
      *
      * @param tenantCode
-     * @param keyPrefix the name of directory
+     * @param keyPrefix  the name of directory
      * @param strPath
      */
     private void uploadDirectory(String tenantCode, String keyPrefix, String strPath) {
@@ -383,7 +354,7 @@ public class S3Utils implements Closeable, StorageOperate {
      * download S3 Directory to local
      *
      * @param tenantCode
-     * @param keyPrefix the name of directory
+     * @param keyPrefix  the name of directory
      * @param srcPath
      */
     private void downloadDirectory(String tenantCode, String keyPrefix, String srcPath) {
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorFactory.java
similarity index 56%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
rename to dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorFactory.java
index 7b4c6d89c0..c3b0821cdd 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
+++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorFactory.java
@@ -15,20 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.worker.config;
+package org.apache.dolphinscheduler.plugin.storage.s3;
 
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageType;
 
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
+import com.google.auto.service.AutoService;
 
-@Configuration
-public class BeanConfig {
+@AutoService(StorageOperateFactory.class)
+public class S3StorageOperatorFactory implements StorageOperateFactory {
 
-    @Bean
-    public AlertClientService alertClientService(WorkerConfig workerConfig) {
-        return new AlertClientService(
-                workerConfig.getAlertListenHost(),
-                workerConfig.getAlertListenPort());
+    @Override
+    public StorageOperate createStorageOperate() {
+        return new S3StorageOperator();
+    }
+
+    @Override
+    public StorageType getStorageOperate() {
+        return StorageType.S3;
     }
 }
diff --git a/dolphinscheduler-storage-plugin/pom.xml b/dolphinscheduler-storage-plugin/pom.xml
new file mode 100644
index 0000000000..39e68ea7c2
--- /dev/null
+++ b/dolphinscheduler-storage-plugin/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <artifactId>dolphinscheduler</artifactId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>dolphinscheduler-storage-plugin</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>dolphinscheduler-storage-api</module>
+        <module>dolphinscheduler-storage-all</module>
+        <module>dolphinscheduler-storage-s3</module>
+        <module>dolphinscheduler-storage-hdfs</module>
+        <module>dolphinscheduler-storage-oss</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-bom</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 8674723911..c79b4a8746 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -17,14 +17,15 @@
 
 package org.apache.dolphinscheduler.plugin.task.api;
 
-import static org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.getPidsStr;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.getPidsStr;
 
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
 import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
deleted file mode 100644
index 22518f7553..0000000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.api;
-
-import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
-
-import org.apache.commons.lang3.SystemUtils;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import lombok.NonNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class ProcessUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
-
-    private ProcessUtils() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    /**
-     * Initialization regularization, solve the problem of pre-compilation performance,
-     * avoid the thread safety problem of multi-thread operation
-     */
-    private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)");
-
-    /**
-     * Expression of PID recognition in Windows scene
-     */
-    private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
-
-    /**
-     * kill tasks according to different task types.
-     */
-    public static boolean kill(@NonNull TaskExecutionContext request) {
-        try {
-            logger.info("Begin kill task instance, processId: {}", request.getProcessId());
-            int processId = request.getProcessId();
-            if (processId == 0) {
-                logger.error("Task instance kill failed, processId is not exist");
-                return false;
-            }
-
-            String cmd = String.format("kill -9 %s", getPidsStr(processId));
-            cmd = OSUtils.getSudoCmd(request.getTenantCode(), cmd);
-            logger.info("process id:{}, cmd:{}", processId, cmd);
-
-            OSUtils.exeCmd(cmd);
-            logger.info("Success kill task instance, processId: {}", request.getProcessId());
-            return true;
-        } catch (Exception e) {
-            logger.error("Kill task instance error, processId: {}", request.getProcessId(), e);
-            return false;
-        }
-    }
-
-    /**
-     * get pids str.
-     *
-     * @param processId process id
-     * @return pids pid String
-     * @throws Exception exception
-     */
-    public static String getPidsStr(int processId) throws Exception {
-        StringBuilder sb = new StringBuilder();
-        Matcher mat = null;
-        // pstree pid get sub pids
-        if (SystemUtils.IS_OS_MAC) {
-            String pids = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId));
-            if (null != pids) {
-                mat = MACPATTERN.matcher(pids);
-            }
-        } else {
-            String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
-            mat = WINDOWSATTERN.matcher(pids);
-        }
-
-        if (null != mat) {
-            while (mat.find()) {
-                sb.append(mat.group(1)).append(" ");
-            }
-        }
-
-        return sb.toString().trim();
-    }
-
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
similarity index 95%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
index d7e35c3f28..72ff327cf7 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginManager.java
@@ -15,12 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.task;
+package org.apache.dolphinscheduler.plugin.task.api;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
-import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java
similarity index 98%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminator.java
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java
index f52604a776..7a8db804e4 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminator.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.service.log;
+package org.apache.dolphinscheduler.plugin.task.api.log;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogFilter.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java
similarity index 97%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogFilter.java
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java
index 6f7a920e79..652bcfe5c5 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/TaskLogFilter.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.log;
+package org.apache.dolphinscheduler.plugin.task.api.log;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
index 36251fe9e6..16849d3837 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
@@ -17,18 +17,29 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.utils;
 
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.constants.DateConstants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -39,11 +50,18 @@ import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import ch.qos.logback.classic.sift.SiftingAppender;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.spi.AppenderAttachable;
 
 @Slf4j
 @UtilityClass
 public class LogUtils {
 
+    private static final String LOG_TAILFIX = ".log";
     private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
 
     public List<String> getAppIds(@NonNull String logPath, @NonNull String appInfoPath, String fetchWay) {
@@ -56,6 +74,55 @@ public class LogUtils {
         }
     }
 
+    public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
+        return getTaskLogPath(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
+                taskExecutionContext.getProcessDefineCode(),
+                taskExecutionContext.getProcessDefineVersion(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId());
+    }
+
+    public static String getTaskLogPath(Date firstSubmitTime,
+                                        Long processDefineCode,
+                                        int processDefineVersion,
+                                        int processInstanceId,
+                                        int taskInstanceId) {
+        // format /logs/YYYYMMDD/defintion-code_defintion_version-processInstanceId-taskInstanceId.log
+        final String taskLogFileName = new StringBuilder(String.valueOf(processDefineCode))
+                .append(Constants.UNDERLINE)
+                .append(processDefineVersion)
+                .append(Constants.SUBTRACT_CHAR)
+                .append(processInstanceId)
+                .append(Constants.SUBTRACT_CHAR)
+                .append(taskInstanceId)
+                .append(LOG_TAILFIX)
+                .toString();
+        // Optional.map will be skipped if null
+        return Optional.of(LoggerFactory.getILoggerFactory())
+                .map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
+                .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
+                .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
+                .map(TaskLogDiscriminator::getLogBase)
+                .map(e -> Paths.get(e)
+                        .toAbsolutePath()
+                        .resolve(DateUtils.format(firstSubmitTime, DateConstants.YYYYMMDD, null))
+                        .resolve(taskLogFileName))
+                .map(Path::toString)
+                .orElse("");
+    }
+
+    public static String buildTaskId(Date firstSubmitTime,
+                                     Long processDefineCode,
+                                     int processDefineVersion,
+                                     int processInstId,
+                                     int taskId) {
+        // like TaskAppId=TASK-20211107-798_1-4084-15210
+        String firstSubmitTimeStr = DateUtils.format(firstSubmitTime, DateConstants.YYYYMMDD, null);
+        return String.format("%s=%s-%s-%s_%s-%s-%s",
+                TaskConstants.TASK_APPID_LOG_FORMAT, TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr,
+                processDefineCode, processDefineVersion, processInstId, taskId);
+    }
+
     public List<String> getAppIdsFromAppInfoFile(@NonNull String appInfoPath, Logger logger) {
         File appInfoFile = new File(appInfoPath);
         if (!appInfoFile.exists() || !appInfoFile.isFile()) {
@@ -96,4 +163,44 @@ public class LogUtils {
             return Collections.emptyList();
         }
     }
+
+    public static String readWholeFileContent(String filePath) {
+        String line;
+        StringBuilder sb = new StringBuilder();
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) {
+            while ((line = br.readLine()) != null) {
+                sb.append(line + "\r\n");
+            }
+            return sb.toString();
+        } catch (IOException e) {
+            log.error("read file error", e);
+        }
+        return "";
+    }
+
+    public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) {
+        setWorkflowInstanceIdMDC(workflowInstanceId);
+        setTaskInstanceIdMDC(taskInstanceId);
+    }
+
+    public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) {
+        MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
+    }
+
+    public static void setTaskInstanceIdMDC(Integer taskInstanceId) {
+        MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
+    }
+
+    public static void removeWorkflowAndTaskInstanceIdMDC() {
+        removeWorkflowInstanceIdMDC();
+        removeTaskInstanceIdMDC();
+    }
+
+    public static void removeWorkflowInstanceIdMDC() {
+        MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY);
+    }
+
+    public static void removeTaskInstanceIdMDC() {
+        MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY);
+    }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
new file mode 100644
index 0000000000..21015c34dc
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.exception.BaseException;
+import org.apache.dolphinscheduler.common.utils.HttpUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import lombok.NonNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public final class ProcessUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
+
+    private ProcessUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    /**
+     * Initialization regularization, solve the problem of pre-compilation performance,
+     * avoid the thread safety problem of multi-thread operation
+     */
+    private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)");
+
+    /**
+     * Expression of PID recognition in Windows scene
+     */
+    private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
+
+    private static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
+    private static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
+    private static final String JOB_HISTORY_ADDRESS =
+            PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
+    private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
+            PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
+
+    /**
+     * kill tasks according to different task types.
+     */
+    public static boolean kill(@NonNull TaskExecutionContext request) {
+        try {
+            logger.info("Begin kill task instance, processId: {}", request.getProcessId());
+            int processId = request.getProcessId();
+            if (processId == 0) {
+                logger.error("Task instance kill failed, processId is not exist");
+                return false;
+            }
+
+            String cmd = String.format("kill -9 %s", getPidsStr(processId));
+            cmd = OSUtils.getSudoCmd(request.getTenantCode(), cmd);
+            logger.info("process id:{}, cmd:{}", processId, cmd);
+
+            OSUtils.exeCmd(cmd);
+            logger.info("Success kill task instance, processId: {}", request.getProcessId());
+            return true;
+        } catch (Exception e) {
+            logger.error("Kill task instance error, processId: {}", request.getProcessId(), e);
+            return false;
+        }
+    }
+
+    /**
+     * get pids str.
+     *
+     * @param processId process id
+     * @return pids pid String
+     * @throws Exception exception
+     */
+    public static String getPidsStr(int processId) throws Exception {
+        StringBuilder sb = new StringBuilder();
+        Matcher mat = null;
+        // pstree pid get sub pids
+        if (SystemUtils.IS_OS_MAC) {
+            String pids = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId));
+            if (null != pids) {
+                mat = MACPATTERN.matcher(pids);
+            }
+        } else {
+            String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
+            mat = WINDOWSATTERN.matcher(pids);
+        }
+
+        if (null != mat) {
+            while (mat.find()) {
+                sb.append(mat.group(1)).append(" ");
+            }
+        }
+
+        return sb.toString().trim();
+    }
+
+    /**
+     * kill yarn application.
+     *
+     * @param appIds      app id list
+     * @param logger      logger
+     * @param tenantCode  tenant code
+     * @param executePath execute path
+     */
+    public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) {
+        if (appIds == null || appIds.isEmpty()) {
+            return;
+        }
+
+        for (String appId : appIds) {
+            try {
+                TaskExecutionStatus applicationStatus = getApplicationStatus(appId);
+
+                if (!applicationStatus.isFinished()) {
+                    String commandFile = String.format("%s/%s.kill", executePath, appId);
+                    String cmd = getKerberosInitCommand() + "yarn application -kill " + appId;
+                    execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd);
+                }
+            } catch (Exception e) {
+                logger.error("Get yarn application app id [{}}] status failed", appId, e);
+            }
+        }
+    }
+
+    /**
+     * get the state of an application
+     *
+     * @param applicationId application id
+     * @return the return may be null or there may be other parse exceptions
+     */
+    public static TaskExecutionStatus getApplicationStatus(String applicationId) throws BaseException {
+        if (StringUtils.isEmpty(applicationId)) {
+            return null;
+        }
+
+        String result;
+        String applicationUrl = getApplicationUrl(applicationId);
+        logger.debug("generate yarn application url, applicationUrl={}", applicationUrl);
+
+        String responseContent = Boolean.TRUE
+                .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
+                        ? KerberosHttpClient.get(applicationUrl)
+                        : HttpUtils.get(applicationUrl);
+        if (responseContent != null) {
+            ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
+            if (!jsonObject.has("app")) {
+                return TaskExecutionStatus.FAILURE;
+            }
+            result = jsonObject.path("app").path("finalStatus").asText();
+
+        } else {
+            // may be in job history
+            String jobHistoryUrl = getJobHistoryUrl(applicationId);
+            logger.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl);
+            responseContent = Boolean.TRUE
+                    .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
+                            ? KerberosHttpClient.get(jobHistoryUrl)
+                            : HttpUtils.get(jobHistoryUrl);
+
+            if (null != responseContent) {
+                ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
+                if (!jsonObject.has("job")) {
+                    return TaskExecutionStatus.FAILURE;
+                }
+                result = jsonObject.path("job").path("state").asText();
+            } else {
+                return TaskExecutionStatus.FAILURE;
+            }
+        }
+
+        return getExecutionStatus(result);
+    }
+
+    /**
+     * get application url
+     * if rmHaIds contains xx, it signs not use resourcemanager
+     * otherwise:
+     * if rmHaIds is empty, single resourcemanager enabled
+     * if rmHaIds not empty: resourcemanager HA enabled
+     *
+     * @param applicationId application id
+     * @return url of application
+     */
+    private static String getApplicationUrl(String applicationId) throws BaseException {
+
+        String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS);
+        if (StringUtils.isBlank(appUrl)) {
+            throw new BaseException("yarn application url generation failed");
+        }
+        logger.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId);
+        return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
+    }
+
+    private static String getJobHistoryUrl(String applicationId) {
+        // eg:application_1587475402360_712719 -> job_1587475402360_712719
+        String jobId = applicationId.replace("application", "job");
+        return String.format(JOB_HISTORY_ADDRESS, jobId);
+    }
+
+    /**
+     * build kill command for yarn application
+     *
+     * @param logger logger
+     * @param tenantCode tenant code
+     * @param appId app id
+     * @param commandFile command file
+     * @param cmd cmd
+     */
+    private static void execYarnKillCommand(Logger logger, String tenantCode, String appId, String commandFile,
+                                            String cmd) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("#!/bin/sh\n");
+            sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
+            sb.append("cd $BASEDIR\n");
+
+            sb.append("\n\n");
+            sb.append(cmd);
+
+            File f = new File(commandFile);
+
+            if (!f.exists()) {
+                org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
+                        StandardCharsets.UTF_8);
+            }
+
+            String runCmd = String.format("%s %s", Constants.SH, commandFile);
+            runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
+            logger.info("kill cmd:{}", runCmd);
+            org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
+        } catch (Exception e) {
+            logger.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage()));
+        }
+    }
+
+    private static TaskExecutionStatus getExecutionStatus(String result) {
+        switch (result) {
+            case Constants.ACCEPTED:
+                return TaskExecutionStatus.SUBMITTED_SUCCESS;
+            case Constants.SUCCEEDED:
+            case Constants.ENDED:
+                return TaskExecutionStatus.SUCCESS;
+            case Constants.NEW:
+            case Constants.NEW_SAVING:
+            case Constants.SUBMITTED:
+            case Constants.FAILED:
+                return TaskExecutionStatus.FAILURE;
+            case Constants.KILLED:
+                return TaskExecutionStatus.KILL;
+            case Constants.RUNNING:
+            default:
+                return TaskExecutionStatus.RUNNING_EXECUTION;
+        }
+    }
+
+    /**
+     * getAppAddress
+     *
+     * @param appAddress app address
+     * @param rmHa       resource manager ha
+     * @return app address
+     */
+    private static String getAppAddress(String appAddress, String rmHa) {
+
+        String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
+
+        if (split1.length != 2) {
+            return null;
+        }
+
+        String start = split1[0] + Constants.DOUBLE_SLASH;
+        String[] split2 = split1[1].split(Constants.COLON);
+
+        if (split2.length != 2) {
+            return null;
+        }
+
+        String end = Constants.COLON + split2[1];
+
+        // get active ResourceManager
+        String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa);
+
+        if (StringUtils.isEmpty(activeRM)) {
+            return null;
+        }
+
+        return start + activeRM + end;
+    }
+
+    /**
+     * get kerberos init command
+     */
+    private static String getKerberosInitCommand() {
+        logger.info("get kerberos init command");
+        StringBuilder kerberosCommandBuilder = new StringBuilder();
+        boolean hadoopKerberosState =
+                PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
+        if (hadoopKerberosState) {
+            kerberosCommandBuilder.append("export KRB5_CONFIG=")
+                    .append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH))
+                    .append("\n\n")
+                    .append(String.format("kinit -k -t %s %s || true",
+                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH),
+                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)))
+                    .append("\n\n");
+            logger.info("kerberos init command: {}", kerberosCommandBuilder);
+        }
+        return kerberosCommandBuilder.toString();
+    }
+
+    /**
+     * yarn ha admin utils
+     */
+    private static final class YarnHAAdminUtils {
+
+        /**
+         * get active resourcemanager node
+         *
+         * @param protocol http protocol
+         * @param rmIds    yarn ha ids
+         * @return yarn active node
+         */
+        public static String getActiveRMName(String protocol, String rmIds) {
+
+            String[] rmIdArr = rmIds.split(Constants.COMMA);
+
+            String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
+
+            try {
+
+                /**
+                 * send http get request to rm
+                 */
+
+                for (String rmId : rmIdArr) {
+                    String state = getRMState(String.format(yarnUrl, rmId));
+                    if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
+                        return rmId;
+                    }
+                }
+
+            } catch (Exception e) {
+                logger.error("yarn ha application url generation failed, message:{}", e.getMessage());
+            }
+            return null;
+        }
+
+        /**
+         * get ResourceManager state
+         */
+        public static String getRMState(String url) {
+
+            String retStr = Boolean.TRUE
+                    .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
+                            ? KerberosHttpClient.get(url)
+                            : HttpUtils.get(url);
+
+            if (StringUtils.isEmpty(retStr)) {
+                return null;
+            }
+            // to json
+            ObjectNode jsonObject = JSONUtils.parseObject(retStr);
+
+            // get ResourceManager state
+            if (!jsonObject.has("clusterInfo")) {
+                return null;
+            }
+            return jsonObject.get("clusterInfo").path("haState").asText();
+        }
+    }
+
+}
diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml
index 5386d78263..8c67b8cc56 100644
--- a/dolphinscheduler-worker/pom.xml
+++ b/dolphinscheduler-worker/pom.xml
@@ -44,15 +44,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-service</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-spi</artifactId>
+            <artifactId>dolphinscheduler-common</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-common</artifactId>
+            <artifactId>dolphinscheduler-remote</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.dolphinscheduler</groupId>
@@ -68,6 +64,11 @@
             <artifactId>dolphinscheduler-task-all</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-all</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter</artifactId>
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 80aab4ee2a..be093dc3b1 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -21,19 +21,17 @@ import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -65,19 +63,6 @@ public class WorkerServer implements IStoppable {
      */
     private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
 
-    /**
-     * spring application context
-     * only use it for initialization
-     */
-    @Autowired
-    private SpringApplicationContext springApplicationContext;
-
-    /**
-     * alert model netty remote server
-     */
-    @Autowired
-    private AlertClientService alertClientService;
-
     @Autowired
     private WorkerManagerThread workerManagerThread;
 
@@ -144,9 +129,7 @@ public class WorkerServer implements IStoppable {
 
         try (
                 WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
-                WorkerRegistryClient closedRegistryClient = workerRegistryClient;
-                AlertClientService closedAlertClientService = alertClientService;
-                SpringApplicationContext closedSpringContext = springApplicationContext;) {
+                WorkerRegistryClient closedRegistryClient = workerRegistryClient) {
             logger.info("Worker server is stopping, current cause : {}", cause);
             // kill running tasks
             this.killAllRunningTasks();
@@ -175,13 +158,13 @@ public class WorkerServer implements IStoppable {
         for (TaskExecutionContext taskRequest : taskRequests) {
             // kill task when it's not finished yet
             try {
-                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
                         taskRequest.getTaskInstanceId());
                 if (ProcessUtils.kill(taskRequest)) {
                     killNumber++;
                 }
             } finally {
-                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         }
         logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(),
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index 6695c4623c..b18cb3bfee 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -20,9 +20,9 @@ package org.apache.dolphinscheduler.server.worker.message;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.BaseCommand;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.apache.commons.collections4.MapUtils;
 
@@ -119,7 +119,7 @@ public class MessageRetryRunner extends BaseDaemonThread {
                         iterator.remove();
                         continue;
                     }
-                    LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+                    LogUtils.setTaskInstanceIdMDC(taskInstanceId);
                     try {
                         for (Map.Entry<CommandType, BaseCommand> messageEntry : retryMessageMap.entrySet()) {
                             CommandType messageType = messageEntry.getKey();
@@ -134,7 +134,7 @@ public class MessageRetryRunner extends BaseDaemonThread {
                     } catch (Exception e) {
                         logger.warn("Retry send message to master error", e);
                     } finally {
-                        LoggerUtils.removeTaskInstanceIdMDC();
+                        LogUtils.removeTaskInstanceIdMDC();
                     }
                 }
                 Thread.sleep(Constants.SLEEP_TIME_MILLIS);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
index ec5bd2ebad..dcf6449820 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
@@ -19,9 +19,12 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@@ -29,14 +32,10 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.apache.dolphinscheduler.service.utils.LogUtils;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +69,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
      * alert client service
      */
     @Autowired
-    private AlertClientService alertClientService;
+    private WorkerRpcClient workerRpcClient;
 
     @Autowired
     private TaskPluginManager taskPluginManager;
@@ -107,7 +106,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
             return;
         }
         try {
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+            LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                     taskExecutionContext.getTaskInstanceId());
             TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
             // set cache, it will be used when kill task
@@ -132,7 +131,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
                             workerConfig,
                             workflowMasterAddress,
                             workerMessageSender,
-                            alertClientService,
+                            workerRpcClient,
                             taskPluginManager,
                             storageOperate)
                     .createWorkerTaskExecuteRunnable();
@@ -149,7 +148,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
                         workerManager.getWaitSubmitQueueSize());
             }
         } finally {
-            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
index e2ff1d005a..5b58e22e38 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
@@ -18,12 +18,12 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
         }
 
         try {
-            LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
+            LogUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
             logger.info("Receive task execute response ack command : {}", taskExecuteAckMessage);
             if (taskExecuteAckMessage.isSuccess()) {
                 messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
@@ -70,7 +70,7 @@ public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
                         taskExecuteAckMessage);
             }
         } finally {
-            LoggerUtils.removeTaskInstanceIdMDC();
+            LogUtils.removeTaskInstanceIdMDC();
 
         }
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
index 9cbde24d9e..5698229f90 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
@@ -18,12 +18,12 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +56,7 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
             return;
         }
         try {
-            LoggerUtils.setTaskInstanceIdMDC(runningAckCommand.getTaskInstanceId());
+            LogUtils.setTaskInstanceIdMDC(runningAckCommand.getTaskInstanceId());
             logger.info("task execute running ack command : {}", runningAckCommand);
 
             if (runningAckCommand.isSuccess()) {
@@ -64,7 +64,7 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
                         CommandType.TASK_EXECUTE_RUNNING);
             }
         } finally {
-            LoggerUtils.removeTaskInstanceIdMDC();
+            LogUtils.removeTaskInstanceIdMDC();
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 60a4c35f5f..45fbf9fce4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,13 +17,19 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
+import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
+
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@@ -34,9 +40,8 @@ import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
-import org.apache.dolphinscheduler.service.log.LogClient;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-import org.apache.dolphinscheduler.service.utils.ProcessUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,7 +52,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
@@ -70,9 +74,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
-    @Autowired
-    private LogClient logClient;
-
     /**
      * task kill process
      *
@@ -92,7 +93,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
         int taskInstanceId = killCommand.getTaskInstanceId();
         try {
-            LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+            LogUtils.setTaskInstanceIdMDC(taskInstanceId);
             TaskExecutionContext taskExecutionContext =
                     TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
             if (taskExecutionContext == null) {
@@ -125,7 +126,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
             logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
         } finally {
-            LoggerUtils.removeTaskInstanceIdMDC();
+            LogUtils.removeTaskInstanceIdMDC();
         }
     }
 
@@ -238,7 +239,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         }
         try {
             logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath);
-            List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath, appInfoPath);
+            List<String> appIds = LogUtils.getAppIds(logPath, appInfoPath,
+                    PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
             if (CollectionUtils.isEmpty(appIds)) {
                 logger.info("The appId is empty");
                 return Pair.of(true, Collections.emptyList());
@@ -246,9 +248,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
             ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
             return Pair.of(true, appIds);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            logger.error("kill yarn job error, the current thread has been interrtpted", e);
         } catch (Exception e) {
             logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath,
                     executePath, tenantCode, e);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
index 68c88332e2..c4f5330ee4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
@@ -18,12 +18,12 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +54,7 @@ public class TaskRejectAckProcessor implements NettyRequestProcessor {
         }
 
         try {
-            LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
+            LogUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
             logger.info("Receive task reject response ack command: {}", taskRejectAckMessage);
             if (taskRejectAckMessage.isSuccess()) {
                 messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
@@ -65,7 +65,7 @@ public class TaskRejectAckProcessor implements NettyRequestProcessor {
                         taskRejectAckMessage);
             }
         } finally {
-            LoggerUtils.removeTaskInstanceIdMDC();
+            LogUtils.removeTaskInstanceIdMDC();
         }
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
index 3190af4b4f..3ec5c28376 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
@@ -29,7 +30,6 @@ import org.apache.dolphinscheduler.remote.command.TaskSavePointResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,12 +84,12 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
         }
 
         try {
-            LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+            LogUtils.setTaskInstanceIdMDC(taskInstanceId);
             doSavePoint(taskInstanceId);
 
             sendTaskSavePointResponseCommand(channel, taskExecutionContext);
         } finally {
-            LoggerUtils.removeTaskInstanceIdMDC();
+            LogUtils.removeTaskInstanceIdMDC();
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
index a8f8cad6bc..7e549a458c 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import lombok.NonNull;
 
@@ -32,14 +31,11 @@ public class WorkerConnectionStateListener implements ConnectionListener {
 
     private final Logger logger = LoggerFactory.getLogger(WorkerConnectionStateListener.class);
     private final WorkerConfig workerConfig;
-    private final RegistryClient registryClient;
     private final WorkerConnectStrategy workerConnectStrategy;
 
     public WorkerConnectionStateListener(@NonNull WorkerConfig workerConfig,
-                                         @NonNull RegistryClient registryClient,
                                          @NonNull WorkerConnectStrategy workerConnectStrategy) {
         this.workerConfig = workerConfig;
-        this.registryClient = registryClient;
         this.workerConnectStrategy = workerConnectStrategy;
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index ee8d80bb2e..4a67307144 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -25,11 +25,11 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.io.IOException;
 
@@ -70,7 +70,7 @@ public class WorkerRegistryClient implements AutoCloseable {
         try {
             registry();
             registryClient.addConnectionStateListener(
-                    new WorkerConnectionStateListener(workerConfig, registryClient, workerConnectStrategy));
+                    new WorkerConnectionStateListener(workerConfig, workerConnectStrategy));
         } catch (Exception ex) {
             throw new RegistryException("Worker registry client start up error", ex);
         }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
index 7e7e294ecc..88433c9465 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
@@ -17,9 +17,9 @@
 
 package org.apache.dolphinscheduler.server.worker.registry;
 
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.StrategyType;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
index 2d1c3359b2..1549385fe9 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
 
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.StrategyType;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -26,7 +27,6 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.time.Duration;
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index 9cae7b9f6a..26b2e93166 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.rpc;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.processor.LoggerRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
@@ -28,7 +29,6 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAck
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskSavePointProcessor;
-import org.apache.dolphinscheduler.service.log.LoggerRequestProcessor;
 
 import java.io.Closeable;
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
index e926b8d9bb..583f1713a2 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
@@ -17,14 +17,14 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 
 import javax.annotation.Nullable;
 
@@ -36,11 +36,16 @@ public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecut
                                                  @NonNull WorkerConfig workerConfig,
                                                  @NonNull String workflowMaster,
                                                  @NonNull WorkerMessageSender workerMessageSender,
-                                                 @NonNull AlertClientService alertClientService,
+                                                 @NonNull WorkerRpcClient workerRpcClient,
                                                  @NonNull TaskPluginManager taskPluginManager,
                                                  @Nullable StorageOperate storageOperate) {
-        super(taskExecutionContext, workerConfig, workflowMaster, workerMessageSender, alertClientService,
-                taskPluginManager, storageOperate);
+        super(taskExecutionContext,
+                workerConfig,
+                workflowMaster,
+                workerMessageSender,
+                workerRpcClient,
+                taskPluginManager,
+                storageOperate);
     }
 
     @Override
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
index dad1421675..151800742c 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
@@ -17,12 +17,12 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 
 import javax.annotation.Nullable;
 
@@ -36,11 +36,16 @@ public class DefaultWorkerDelayTaskExecuteRunnableFactory
                                                            @NonNull WorkerConfig workerConfig,
                                                            @NonNull String workflowMasterAddress,
                                                            @NonNull WorkerMessageSender workerMessageSender,
-                                                           @NonNull AlertClientService alertClientService,
+                                                           @NonNull WorkerRpcClient workerRpcClient,
                                                            @NonNull TaskPluginManager taskPluginManager,
                                                            @Nullable StorageOperate storageOperate) {
-        super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService,
-                taskPluginManager, storageOperate);
+        super(taskExecutionContext,
+                workerConfig,
+                workflowMasterAddress,
+                workerMessageSender,
+                workerRpcClient,
+                taskPluginManager,
+                storageOperate);
     }
 
     @Override
@@ -50,7 +55,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableFactory
                 workerConfig,
                 workflowMasterAddress,
                 workerMessageSender,
-                alertClientService,
+                workerRpcClient,
                 taskPluginManager,
                 storageOperate);
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
index e322aa9cf0..7fdfc405ef 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
@@ -18,12 +18,12 @@
 package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
@@ -38,11 +38,16 @@ public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRu
                                              @NonNull WorkerConfig workerConfig,
                                              @NonNull String masterAddress,
                                              @NonNull WorkerMessageSender workerMessageSender,
-                                             @NonNull AlertClientService alertClientService,
+                                             @NonNull WorkerRpcClient workerRpcClient,
                                              @NonNull TaskPluginManager taskPluginManager,
                                              @Nullable StorageOperate storageOperate) {
-        super(taskExecutionContext, workerConfig, masterAddress, workerMessageSender, alertClientService,
-                taskPluginManager, storageOperate);
+        super(taskExecutionContext,
+                workerConfig,
+                masterAddress,
+                workerMessageSender,
+                workerRpcClient,
+                taskPluginManager,
+                storageOperate);
     }
 
     @Override
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
index d8677b4680..9144a2f2e0 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
@@ -17,12 +17,12 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 
 import javax.annotation.Nullable;
 
@@ -36,7 +36,7 @@ public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDela
     protected final @NonNull WorkerConfig workerConfig;
     protected final @NonNull String workflowMasterAddress;
     protected final @NonNull WorkerMessageSender workerMessageSender;
-    protected final @NonNull AlertClientService alertClientService;
+    protected final @NonNull WorkerRpcClient workerRpcClient;
     protected final @NonNull TaskPluginManager taskPluginManager;
     protected final @Nullable StorageOperate storageOperate;
 
@@ -45,14 +45,14 @@ public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDela
                                                     @NonNull WorkerConfig workerConfig,
                                                     @NonNull String workflowMasterAddress,
                                                     @NonNull WorkerMessageSender workerMessageSender,
-                                                    @NonNull AlertClientService alertClientService,
+                                                    @NonNull WorkerRpcClient workerRpcClient,
                                                     @NonNull TaskPluginManager taskPluginManager,
                                                     @Nullable StorageOperate storageOperate) {
         this.taskExecutionContext = taskExecutionContext;
         this.workerConfig = workerConfig;
         this.workflowMasterAddress = workflowMasterAddress;
         this.workerMessageSender = workerMessageSender;
-        this.alertClientService = alertClientService;
+        this.workerRpcClient = workerRpcClient;
         this.taskPluginManager = taskPluginManager;
         this.storageOperate = storageOperate;
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
index a005fd1d0c..da84f91027 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
@@ -34,20 +36,22 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
 import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
 import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.apache.dolphinscheduler.service.utils.CommonUtils;
-import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-import org.apache.dolphinscheduler.service.utils.ProcessUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -61,7 +65,6 @@ import lombok.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.google.common.base.Strings;
 
 public abstract class WorkerTaskExecuteRunnable implements Runnable {
@@ -73,9 +76,9 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
     protected final WorkerConfig workerConfig;
     protected final String masterAddress;
     protected final WorkerMessageSender workerMessageSender;
-    protected final AlertClientService alertClientService;
     protected final TaskPluginManager taskPluginManager;
     protected final @Nullable StorageOperate storageOperate;
+    protected final WorkerRpcClient workerRpcClient;
 
     protected @Nullable AbstractTask task;
 
@@ -84,18 +87,18 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
                                         @NonNull WorkerConfig workerConfig,
                                         @NonNull String masterAddress,
                                         @NonNull WorkerMessageSender workerMessageSender,
-                                        @NonNull AlertClientService alertClientService,
+                                        @NonNull WorkerRpcClient workerRpcClient,
                                         @NonNull TaskPluginManager taskPluginManager,
                                         @Nullable StorageOperate storageOperate) {
         this.taskExecutionContext = taskExecutionContext;
         this.workerConfig = workerConfig;
         this.masterAddress = masterAddress;
         this.workerMessageSender = workerMessageSender;
-        this.alertClientService = alertClientService;
+        this.workerRpcClient = workerRpcClient;
         this.taskPluginManager = taskPluginManager;
         this.storageOperate = storageOperate;
         String taskLogName =
-                LoggerUtils.buildTaskId(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
+                LogUtils.buildTaskId(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
                         taskExecutionContext.getProcessDefineCode(),
                         taskExecutionContext.getProcessDefineVersion(),
                         taskExecutionContext.getProcessInstanceId(),
@@ -156,7 +159,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
             // set the thread name to make sure the log be written to the task log file
             Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
 
-            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+            LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                     taskExecutionContext.getTaskInstanceId());
             logger.info("Begin to pulling task");
 
@@ -185,7 +188,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
             logger.error("Task execute failed, due to meet an exception", ex);
             afterThrowing(ex);
         } finally {
-            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
     }
 
@@ -249,9 +252,17 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
         TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo();
         int strategy =
                 status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
-        alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(),
-                taskAlertInfo.getContent(), strategy);
-        logger.info("Success send alert");
+        AlertSendRequestCommand alertCommand = new AlertSendRequestCommand(
+                taskAlertInfo.getAlertGroupId(),
+                taskAlertInfo.getTitle(),
+                taskAlertInfo.getContent(),
+                strategy);
+        try {
+            workerRpcClient.send(Host.of(workerConfig.getAlertListenHost()), alertCommand.convert2Command());
+            logger.info("Success send alert");
+        } catch (RemotingException e) {
+            logger.error("Send alert failed, alertCommand: {}", alertCommand, e);
+        }
     }
 
     protected void sendTaskResult() {
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
index 8ec586397f..50135c35b4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
@@ -17,12 +17,12 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 
 import javax.annotation.Nullable;
 
@@ -36,14 +36,14 @@ public class WorkerTaskExecuteRunnableFactoryBuilder {
                                                                                                        @NonNull WorkerConfig workerConfig,
                                                                                                        @NonNull String workflowMasterAddress,
                                                                                                        @NonNull WorkerMessageSender workerMessageSender,
-                                                                                                       @NonNull AlertClientService alertClientService,
+                                                                                                       @NonNull WorkerRpcClient workerRpcClient,
                                                                                                        @NonNull TaskPluginManager taskPluginManager,
                                                                                                        @Nullable StorageOperate storageOperate) {
         return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
                 workerConfig,
                 workflowMasterAddress,
                 workerMessageSender,
-                alertClientService,
+                workerRpcClient,
                 taskPluginManager,
                 storageOperate);
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 2909bd686b..d1a732f0e1 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.function.Supplier;
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
index a9c5f09b8c..1248aa4b57 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
@@ -21,11 +21,11 @@ import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredEx
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
index 9e3238a0d1..cee09d1085 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
@@ -19,13 +19,13 @@ package org.apache.dolphinscheduler.server.worker.utils;
 
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml
index 2d06ea3d7c..a73f829070 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -20,22 +20,6 @@ spring:
   jackson:
     time-zone: UTC
     date-format: "yyyy-MM-dd HH:mm:ss"
-  datasource:
-    driver-class-name: org.postgresql.Driver
-    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
-    username: root
-    password: root
-    hikari:
-      connection-test-query: select 1
-      minimum-idle: 5
-      auto-commit: true
-      validation-timeout: 3000
-      pool-name: DolphinScheduler
-      maximum-pool-size: 50
-      connection-timeout: 30000
-      idle-timeout: 600000
-      leak-detection-threshold: 0
-      initialization-fail-timeout: 1
 
 registry:
   type: zookeeper
@@ -101,16 +85,3 @@ management:
 
 metrics:
   enabled: true
-
-# Override by profile
-
----
-spring:
-  config:
-    activate:
-      on-profile: mysql
-  datasource:
-    driver-class-name: com.mysql.cj.jdbc.Driver
-    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
-    username: root
-    password: root
diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml
index bd01f46502..8be620d7aa 100644
--- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml
@@ -31,8 +31,8 @@
     <conversionRule conversionWord="message"
                     converterClass="org.apache.dolphinscheduler.common.log.SensitiveDataConverter"/>
     <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
-        <filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
-        <Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
+        <filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/>
+        <Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator">
             <key>taskAppId</key>
             <logBase>${log.base}</logBase>
         </Discriminator>
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
deleted file mode 100644
index 2323e591bf..0000000000
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.config;
-
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-
-@ExtendWith(SpringExtension.class)
-@SpringBootTest(classes = {BeanConfig.class, WorkerConfig.class})
-public class BeanConfigTest {
-
-    @Autowired
-    private AlertClientService alertClientService;
-
-    @Test
-    public void alertClientService() {
-        Assertions.assertNotNull(alertClientService);
-    }
-}
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
index c2e0ce4d8a..26c0dc2806 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
@@ -17,16 +17,16 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -53,7 +53,7 @@ public class TaskDispatchProcessorTest {
     private WorkerMessageSender workerMessageSender;
 
     @Mock
-    private AlertClientService alertClientService;
+    private WorkerRpcClient workerRpcClient;
 
     @Mock
     private TaskPluginManager taskPluginManager;
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index f39b8a9ad7..bbfeed5a50 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -21,9 +21,9 @@ import static org.mockito.BDDMockito.given;
 
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.time.Duration;
 import java.util.Set;
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
index a09b13753c..4063f14cac 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
@@ -18,13 +18,13 @@
 package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.service.alert.AlertClientService;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
-import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -40,7 +40,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
 
     private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class);
 
-    private AlertClientService alertClientService = Mockito.mock(AlertClientService.class);
+    private WorkerRpcClient alertClientService = Mockito.mock(WorkerRpcClient.class);
 
     private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class);
 
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java
index 94a6a36ae1..9b8d5a9129 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java
@@ -18,11 +18,11 @@
 package org.apache.dolphinscheduler.server.worker.utils;
 
 import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.service.storage.StorageOperate;
 
 import org.apache.curator.shaded.com.google.common.io.Files;
 
diff --git a/pom.xml b/pom.xml
index d5a96ba4b1..79f9ae6684 100755
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
         <module>dolphinscheduler-tools</module>
         <module>dolphinscheduler-ui</module>
         <module>dolphinscheduler-scheduler-plugin</module>
+        <module>dolphinscheduler-storage-plugin</module>
     </modules>
 
     <properties>
@@ -240,6 +241,17 @@
                 <version>${project.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-storage-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-storage-all</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.junit</groupId>
                 <artifactId>junit-bom</artifactId>