You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/02/18 15:17:16 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code (#4766)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9ae29a7  [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code (#4766)
9ae29a7 is described below

commit 9ae29a756f0aeed894c80f5e495d786ccf03f41f
Author: Shiwen Cheng <ch...@gmail.com>
AuthorDate: Thu Feb 18 23:17:01 2021 +0800

    [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code (#4766)
    
    * [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code
---
 .../api/service/ResourcesService.java              | 1200 +-------------------
 .../api/service/SchedulerService.java              |  509 +--------
 .../api/service/TaskInstanceService.java           |  154 +--
 .../api/service/impl/AccessTokenServiceImpl.java   |   10 +-
 .../api/service/impl/DataAnalysisServiceImpl.java  |    6 +-
 .../service/impl/ProcessDefinitionServiceImpl.java |   19 +-
 .../ResourcesServiceImpl.java}                     |  557 +++++----
 .../SchedulerServiceImpl.java}                     |   48 +-
 .../TaskInstanceServiceImpl.java}                  |   17 +-
 .../api/service/impl/TenantServiceImpl.java        |   12 +-
 .../api/service/impl/UsersServiceImpl.java         |   25 +-
 .../dolphinscheduler/api/utils/RegexUtils.java     |    9 +
 .../api/service/ResourcesServiceTest.java          |    3 +-
 .../api/service/SchedulerServiceTest.java          |    3 +-
 .../api/service/TaskInstanceServiceTest.java       |    3 +-
 .../dolphinscheduler/common/utils/StringUtils.java |    8 +-
 16 files changed, 423 insertions(+), 2160 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index e7d8906..bb778dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -14,68 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.api.service;
 
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import org.apache.commons.collections.BeanMap;
-import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
-import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
-import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
-import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.exceptions.ServiceException;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
-import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.dao.DuplicateKeyException;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
+import org.apache.dolphinscheduler.dao.entity.User;
 
 import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.stream.Collectors;
+import java.util.Map;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.springframework.web.multipart.MultipartFile;
 
 /**
  * resources service
  */
-@Service
-public class ResourcesService extends BaseService {
-
-    private static final Logger logger = LoggerFactory.getLogger(ResourcesService.class);
-
-    @Autowired
-    private ResourceMapper resourcesMapper;
-
-    @Autowired
-    private UdfFuncMapper udfFunctionMapper;
-
-    @Autowired
-    private TenantMapper tenantMapper;
-
-    @Autowired
-    private UserMapper userMapper;
-
-    @Autowired
-    private ResourceUserMapper resourceUserMapper;
-
-    @Autowired
-    private ProcessDefinitionMapper processDefinitionMapper;
+public interface ResourcesService {
 
     /**
      * create directory
@@ -88,74 +43,12 @@ public class ResourcesService extends BaseService {
      * @param currentDir current directory
      * @return create directory result
      */
-    @Transactional(rollbackFor = Exception.class)
-    public Result createDirectory(User loginUser,
-                                 String name,
-                                 String description,
-                                 ResourceType type,
-                                 int pid,
-                                 String currentDir) {
-        Result result = new Result();
-        // if hdfs not startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
-        result = verifyResourceName(fullName,type,loginUser);
-        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
-            return result;
-        }
-        if (pid != -1) {
-            Resource parentResource = resourcesMapper.selectById(pid);
-
-            if (parentResource == null) {
-                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
-                return result;
-            }
-
-            if (!hasPerm(loginUser, parentResource.getUserId())) {
-                putMsg(result, Status.USER_NO_OPERATION_PERM);
-                return result;
-            }
-        }
-
-
-        if (checkResourceExists(fullName, 0, type.ordinal())) {
-            logger.error("resource directory {} has exist, can't recreate", fullName);
-            putMsg(result, Status.RESOURCE_EXIST);
-            return result;
-        }
-
-        Date now = new Date();
-
-        Resource resource = new Resource(pid,name,fullName,true,description,name,loginUser.getId(),type,0,now,now);
-
-        try {
-            resourcesMapper.insert(resource);
-
-            putMsg(result, Status.SUCCESS);
-            Map<Object, Object> dataMap = new BeanMap(resource);
-            Map<String, Object> resultMap = new HashMap<String, Object>();
-            for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
-                if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
-                    resultMap.put(entry.getKey().toString(), entry.getValue());
-                }
-            }
-            result.setData(resultMap);
-        } catch (DuplicateKeyException e) {
-            logger.error("resource directory {} has exist, can't recreate", fullName);
-            putMsg(result, Status.RESOURCE_EXIST);
-            return result;
-        } catch (Exception e) {
-            logger.error("resource already exists, can't recreate ", e);
-            throw new RuntimeException("resource already exists, can't recreate");
-        }
-        //create directory in hdfs
-        createDirecotry(loginUser,fullName,type,result);
-        return result;
-    }
+    Result<Object> createDirectory(User loginUser,
+                                   String name,
+                                   String description,
+                                   ResourceType type,
+                                   int pid,
+                                   String currentDir);
 
     /**
      * create resource
@@ -169,121 +62,13 @@ public class ResourcesService extends BaseService {
      * @param currentDir current directory
      * @return create result code
      */
-    @Transactional(rollbackFor = Exception.class)
-    public Result createResource(User loginUser,
-                                 String name,
-                                 String desc,
-                                 ResourceType type,
-                                 MultipartFile file,
-                                 int pid,
-                                 String currentDir) {
-        Result result = new Result();
-
-        // if hdfs not startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-
-        if (pid != -1) {
-            Resource parentResource = resourcesMapper.selectById(pid);
-
-            if (parentResource == null) {
-                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
-                return result;
-            }
-
-            if (!hasPerm(loginUser, parentResource.getUserId())) {
-                putMsg(result, Status.USER_NO_OPERATION_PERM);
-                return result;
-            }
-        }
-
-        // file is empty
-        if (file.isEmpty()) {
-            logger.error("file is empty: {}", file.getOriginalFilename());
-            putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
-            return result;
-        }
-
-        // file suffix
-        String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
-        String nameSuffix = FileUtils.suffix(name);
-
-        // determine file suffix
-        if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
-            /**
-             * rename file suffix and original suffix must be consistent
-             */
-            logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
-            putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
-            return result;
-        }
-
-        //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
-        if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) {
-            logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
-            putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
-            return result;
-        }
-        if (file.getSize() > Constants.MAX_FILE_SIZE) {
-            logger.error("file size is too large: {}", file.getOriginalFilename());
-            putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
-            return result;
-        }
-
-        // check resoure name exists
-        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
-        if (checkResourceExists(fullName, 0, type.ordinal())) {
-            logger.error("resource {} has exist, can't recreate", name);
-            putMsg(result, Status.RESOURCE_EXIST);
-            return result;
-        }
-
-        Date now = new Date();
-        Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now);
-
-        try {
-            resourcesMapper.insert(resource);
-
-            putMsg(result, Status.SUCCESS);
-            Map<Object, Object> dataMap = new BeanMap(resource);
-            Map<String, Object> resultMap = new HashMap<>();
-            for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
-                if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
-                    resultMap.put(entry.getKey().toString(), entry.getValue());
-                }
-            }
-            result.setData(resultMap);
-        } catch (Exception e) {
-            logger.error("resource already exists, can't recreate ", e);
-            throw new RuntimeException("resource already exists, can't recreate");
-        }
-
-        // fail upload
-        if (!upload(loginUser, fullName, file, type)) {
-            logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
-            putMsg(result, Status.HDFS_OPERATION_ERROR);
-            throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
-        }
-        return result;
-    }
-
-    /**
-     * check resource is exists
-     *
-     * @param fullName  fullName
-     * @param userId    user id
-     * @param type      type
-     * @return true if resource exists
-     */
-    private boolean checkResourceExists(String fullName, int userId, int type ){
-
-        List<Resource> resources = resourcesMapper.queryResourceList(fullName, userId, type);
-        return resources != null && resources.size() > 0;
-    }
-
+    Result<Object> createResource(User loginUser,
+                                  String name,
+                                  String desc,
+                                  ResourceType type,
+                                  MultipartFile file,
+                                  int pid,
+                                  String currentDir);
 
     /**
      * update resource
@@ -295,239 +80,12 @@ public class ResourcesService extends BaseService {
      * @param file          resource file
      * @return  update result code
      */
-    @Transactional(rollbackFor = Exception.class)
-    public Result updateResource(User loginUser,
-                                 int resourceId,
-                                 String name,
-                                 String desc,
-                                 ResourceType type,
-                                 MultipartFile file) {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-
-        Resource resource = resourcesMapper.selectById(resourceId);
-        if (resource == null) {
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
-            return result;
-        }
-        if (!hasPerm(loginUser, resource.getUserId())) {
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
-        }
-
-        if (file == null && name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) {
-            putMsg(result, Status.SUCCESS);
-            return result;
-        }
-
-        //check resource aleady exists
-        String originFullName = resource.getFullName();
-        String originResourceName = resource.getAlias();
-
-        String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name);
-        if (!originResourceName.equals(name) && checkResourceExists(fullName, 0, type.ordinal())) {
-            logger.error("resource {} already exists, can't recreate", name);
-            putMsg(result, Status.RESOURCE_EXIST);
-            return result;
-        }
-
-        if (file != null) {
-
-            // file is empty
-            if (file.isEmpty()) {
-                logger.error("file is empty: {}", file.getOriginalFilename());
-                putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
-                return result;
-            }
-
-            // file suffix
-            String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
-            String nameSuffix = FileUtils.suffix(name);
-
-            // determine file suffix
-            if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
-                /**
-                 * rename file suffix and original suffix must be consistent
-                 */
-                logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
-                putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
-                return result;
-            }
-
-            //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
-            if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) {
-                logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
-                putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
-                return result;
-            }
-            if (file.getSize() > Constants.MAX_FILE_SIZE) {
-                logger.error("file size is too large: {}", file.getOriginalFilename());
-                putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
-                return result;
-            }
-        }
-
-        // query tenant by user id
-        String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
-            return result;
-        }
-        // verify whether the resource exists in storage
-        // get the path of origin file in storage
-        String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName);
-        try {
-            if (!HadoopUtils.getInstance().exists(originHdfsFileName)) {
-                logger.error("{} not exist", originHdfsFileName);
-                putMsg(result,Status.RESOURCE_NOT_EXIST);
-                return result;
-            }
-        } catch (IOException e) {
-            logger.error(e.getMessage(),e);
-            throw new ServiceException(Status.HDFS_OPERATION_ERROR);
-        }
-
-        if (!resource.isDirectory()) {
-            //get the origin file suffix
-            String originSuffix = FileUtils.suffix(originFullName);
-            String suffix = FileUtils.suffix(fullName);
-            boolean suffixIsChanged = false;
-            if (StringUtils.isBlank(suffix) && StringUtils.isNotBlank(originSuffix)) {
-                suffixIsChanged = true;
-            }
-            if (StringUtils.isNotBlank(suffix) && !suffix.equals(originSuffix)) {
-                suffixIsChanged = true;
-            }
-            //verify whether suffix is changed
-            if (suffixIsChanged) {
-                //need verify whether this resource is authorized to other users
-                Map<String, Object> columnMap = new HashMap<>();
-                columnMap.put("resources_id", resourceId);
-
-                List<ResourcesUser> resourcesUsers = resourceUserMapper.selectByMap(columnMap);
-                if (CollectionUtils.isNotEmpty(resourcesUsers)) {
-                    List<Integer> userIds = resourcesUsers.stream().map(ResourcesUser::getUserId).collect(Collectors.toList());
-                    List<User> users = userMapper.selectBatchIds(userIds);
-                    String userNames = users.stream().map(User::getUserName).collect(Collectors.toList()).toString();
-                    logger.error("resource is authorized to user {},suffix not allowed to be modified", userNames);
-                    putMsg(result,Status.RESOURCE_IS_AUTHORIZED,userNames);
-                    return result;
-                }
-            }
-        }
-
-        // updateResource data
-        Date now = new Date();
-
-        resource.setAlias(name);
-        resource.setFullName(fullName);
-        resource.setDescription(desc);
-        resource.setUpdateTime(now);
-        if (file != null) {
-            resource.setFileName(file.getOriginalFilename());
-            resource.setSize(file.getSize());
-        }
-
-        try {
-            resourcesMapper.updateById(resource);
-            if (resource.isDirectory()) {
-                List<Integer> childrenResource = listAllChildren(resource,false);
-                if (CollectionUtils.isNotEmpty(childrenResource)) {
-                    String matcherFullName = Matcher.quoteReplacement(fullName);
-                    List<Resource> childResourceList = new ArrayList<>();
-                    Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]);
-                    List<Resource> resourceList = resourcesMapper.listResourceByIds(childResIdArray);
-                    childResourceList = resourceList.stream().map(t -> {
-                        t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName));
-                        t.setUpdateTime(now);
-                        return t;
-                    }).collect(Collectors.toList());
-                    resourcesMapper.batchUpdateResource(childResourceList);
-
-                    if (ResourceType.UDF.equals(resource.getType())) {
-                        List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(childResIdArray);
-                        if (CollectionUtils.isNotEmpty(udfFuncs)) {
-                            udfFuncs = udfFuncs.stream().map(t -> {
-                                t.setResourceName(t.getResourceName().replaceFirst(originFullName, matcherFullName));
-                                t.setUpdateTime(now);
-                                return t;
-                            }).collect(Collectors.toList());
-                            udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
-                        }
-                    }
-                }
-            } else if (ResourceType.UDF.equals(resource.getType())) {
-                List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new Integer[]{resourceId});
-                if (CollectionUtils.isNotEmpty(udfFuncs)) {
-                    udfFuncs = udfFuncs.stream().map(t -> {
-                        t.setResourceName(fullName);
-                        t.setUpdateTime(now);
-                        return t;
-                    }).collect(Collectors.toList());
-                    udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
-                }
-
-            }
-
-            putMsg(result, Status.SUCCESS);
-            Map<Object, Object> dataMap = new BeanMap(resource);
-            Map<String, Object> resultMap = new HashMap<>(5);
-            for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
-                if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
-                    resultMap.put(entry.getKey().toString(), entry.getValue());
-                }
-            }
-            result.setData(resultMap);
-        } catch (Exception e) {
-            logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e);
-            throw new ServiceException(Status.UPDATE_RESOURCE_ERROR);
-        }
-
-        // if name unchanged, return directly without moving on HDFS
-        if (originResourceName.equals(name) && file == null) {
-            return result;
-        }
-
-        if (file != null) {
-            // fail upload
-            if (!upload(loginUser, fullName, file, type)) {
-                logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
-                putMsg(result, Status.HDFS_OPERATION_ERROR);
-                throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
-            }
-            if (!fullName.equals(originFullName)) {
-                try {
-                    HadoopUtils.getInstance().delete(originHdfsFileName,false);
-                } catch (IOException e) {
-                    logger.error(e.getMessage(),e);
-                    throw new RuntimeException(String.format("delete resource: %s failed.", originFullName));
-                }
-            }
-            return result;
-        }
-
-
-        // get the path of dest file in hdfs
-        String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
-
-
-        try {
-            logger.info("start hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName);
-            HadoopUtils.getInstance().copy(originHdfsFileName, destHdfsFileName, true, true);
-        } catch (Exception e) {
-            logger.error(MessageFormat.format("hdfs copy {0} -> {1} fail", originHdfsFileName, destHdfsFileName), e);
-            putMsg(result,Status.HDFS_COPY_FAIL);
-            throw new ServiceException(Status.HDFS_COPY_FAIL);
-        }
-
-        return result;
-
-    }
+    Result<Object> updateResource(User loginUser,
+                                  int resourceId,
+                                  String name,
+                                  String desc,
+                                  ResourceType type,
+                                  MultipartFile file);
 
     /**
      * query resources list paging
@@ -539,99 +97,7 @@ public class ResourcesService extends BaseService {
      * @param pageSize page size
      * @return resource list page
      */
-    public Map<String, Object> queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
-
-        HashMap<String, Object> result = new HashMap<>(5);
-        Page<Resource> page = new Page(pageNo, pageSize);
-        int userId = loginUser.getId();
-        if (isAdmin(loginUser)) {
-            userId= 0;
-        }
-        if (direcotryId != -1) {
-            Resource directory = resourcesMapper.selectById(direcotryId);
-            if (directory == null) {
-                putMsg(result, Status.RESOURCE_NOT_EXIST);
-                return result;
-            }
-        }
-
-        IPage<Resource> resourceIPage = resourcesMapper.queryResourcePaging(page,
-                userId,direcotryId, type.ordinal(), searchVal);
-        PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
-        pageInfo.setTotalCount((int)resourceIPage.getTotal());
-        pageInfo.setLists(resourceIPage.getRecords());
-        result.put(Constants.DATA_LIST, pageInfo);
-        putMsg(result,Status.SUCCESS);
-        return result;
-    }
-
-    /**
-     * create direcoty
-     * @param loginUser login user
-     * @param fullName  full name
-     * @param type      resource type
-     * @param result    Result
-     */
-    private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){
-        // query tenant
-        String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
-        String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
-        String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode);
-        try {
-            if (!HadoopUtils.getInstance().exists(resourceRootPath)) {
-                createTenantDirIfNotExists(tenantCode);
-            }
-
-            if (!HadoopUtils.getInstance().mkdir(directoryName)) {
-                logger.error("create resource directory {} of hdfs failed",directoryName);
-                putMsg(result,Status.HDFS_OPERATION_ERROR);
-                throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
-            }
-        } catch (Exception e) {
-            logger.error("create resource directory {} of hdfs failed",directoryName);
-            putMsg(result,Status.HDFS_OPERATION_ERROR);
-            throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
-        }
-    }
-
-    /**
-     * upload file to hdfs
-     *
-     * @param loginUser login user
-     * @param fullName  full name
-     * @param file      file
-     */
-    private boolean upload(User loginUser, String fullName, MultipartFile file, ResourceType type) {
-        // save to local
-        String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
-        String nameSuffix = FileUtils.suffix(fullName);
-
-        // determine file suffix
-        if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
-            return false;
-        }
-        // query tenant
-        String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
-        // random file name
-        String localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString());
-
-
-        // save file to hdfs, and delete original file
-        String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
-        String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode);
-        try {
-            // if tenant dir not exists
-            if (!HadoopUtils.getInstance().exists(resourcePath)) {
-                createTenantDirIfNotExists(tenantCode);
-            }
-            org.apache.dolphinscheduler.api.utils.FileUtils.copyFile(file, localFilename);
-            HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            return false;
-        }
-        return true;
-    }
+    Map<String, Object> queryResourceListPaging(User loginUser, int directoryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize);
 
     /**
      * query resource list
@@ -640,21 +106,7 @@ public class ResourcesService extends BaseService {
      * @param type resource type
      * @return resource list
      */
-    public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
-
-        Map<String, Object> result = new HashMap<>(5);
-
-        int userId = loginUser.getId();
-        if(isAdmin(loginUser)){
-            userId = 0;
-        }
-        List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
-        Visitor resourceTreeVisitor = new ResourceTreeVisitor(allResourceList);
-        result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
-        putMsg(result,Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> queryResourceList(User loginUser, ResourceType type);
 
     /**
      * query resource list by program type
@@ -663,33 +115,7 @@ public class ResourcesService extends BaseService {
      * @param type resource type
      * @return resource list
      */
-    public Map<String, Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) {
-
-        Map<String, Object> result = new HashMap<>(5);
-        String suffix = ".jar";
-        int userId = loginUser.getId();
-        if(isAdmin(loginUser)){
-            userId = 0;
-        }
-        if (programType != null) {
-            switch (programType) {
-                case JAVA:
-                    break;
-                case SCALA:
-                    break;
-                case PYTHON:
-                    suffix = ".py";
-                    break;
-            }
-        }
-        List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
-        List<Resource> resources = new ResourceFilter(suffix,new ArrayList<>(allResourceList)).filter();
-        Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
-        result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
-        putMsg(result,Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType);
 
     /**
      * delete resource
@@ -697,82 +123,9 @@ public class ResourcesService extends BaseService {
      * @param loginUser login user
      * @param resourceId resource id
      * @return delete result code
-     * @throws Exception exception
+     * @throws IOException exception
      */
-    @Transactional(rollbackFor = Exception.class)
-    public Result delete(User loginUser, int resourceId) throws Exception {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-
-        //get resource and  hdfs path
-        Resource resource = resourcesMapper.selectById(resourceId);
-        if (resource == null) {
-            logger.error("resource file not exist,  resource id {}", resourceId);
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
-            return result;
-        }
-        if (!hasPerm(loginUser, resource.getUserId())) {
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
-        }
-
-        String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
-            return  result;
-        }
-
-        // get all resource id of process definitions those is released
-        List<Map<String, Object>> list = processDefinitionMapper.listResources();
-        Map<Integer, Set<Integer>> resourceProcessMap = ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
-        Set<Integer> resourceIdSet = resourceProcessMap.keySet();
-        // get all children of the resource
-        List<Integer> allChildren = listAllChildren(resource,true);
-        Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]);
-
-        //if resource type is UDF,need check whether it is bound by UDF functon
-        if (resource.getType() == (ResourceType.UDF)) {
-            List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray);
-            if (CollectionUtils.isNotEmpty(udfFuncs)) {
-                logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
-                putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
-                return result;
-            }
-        }
-
-        if (resourceIdSet.contains(resource.getPid())) {
-            logger.error("can't be deleted,because it is used of process definition");
-            putMsg(result, Status.RESOURCE_IS_USED);
-            return result;
-        }
-        resourceIdSet.retainAll(allChildren);
-        if (CollectionUtils.isNotEmpty(resourceIdSet)) {
-            logger.error("can't be deleted,because it is used of process definition");
-            for (Integer resId : resourceIdSet) {
-                logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId));
-            }
-            putMsg(result, Status.RESOURCE_IS_USED);
-            return result;
-        }
-
-        // get hdfs file by type
-        String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
-
-        //delete data in database
-        resourcesMapper.deleteIds(needDeleteResourceIdArray);
-        resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray);
-
-        //delete file on hdfs
-        HadoopUtils.getInstance().delete(hdfsFilename, true);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Result<Object> delete(User loginUser, int resourceId) throws IOException;
 
     /**
      * verify resource by name and type
@@ -781,37 +134,7 @@ public class ResourcesService extends BaseService {
      * @param type      resource type
      * @return true if the resource name not exists, otherwise return false
      */
-    public Result verifyResourceName(String fullName, ResourceType type,User loginUser) {
-        Result result = new Result();
-        putMsg(result, Status.SUCCESS);
-        if (checkResourceExists(fullName, 0, type.ordinal())) {
-            logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName);
-            putMsg(result, Status.RESOURCE_EXIST);
-        } else {
-            // query tenant
-            Tenant tenant = tenantMapper.queryById(loginUser.getTenantId());
-            if(tenant != null){
-                String tenantCode = tenant.getTenantCode();
-
-                try {
-                    String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
-                    if(HadoopUtils.getInstance().exists(hdfsFilename)){
-                        logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename);
-                        putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
-                    }
-
-                } catch (Exception e) {
-                    logger.error(e.getMessage(),e);
-                    putMsg(result,Status.HDFS_OPERATION_ERROR);
-                }
-            }else{
-                putMsg(result,Status.TENANT_NOT_EXIST);
-            }
-        }
-
-
-        return result;
-    }
+    Result<Object> verifyResourceName(String fullName, ResourceType type,User loginUser);
 
     /**
      * verify resource by full name or pid and type
@@ -820,40 +143,7 @@ public class ResourcesService extends BaseService {
      * @param type      resource type
      * @return true if the resource full name or pid not exists, otherwise return false
      */
-    public Result queryResource(String fullName,Integer id,ResourceType type) {
-        Result result = new Result();
-        if (StringUtils.isBlank(fullName) && id == null) {
-            logger.error("You must input one of fullName and pid");
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
-            return result;
-        }
-        if (StringUtils.isNotBlank(fullName)) {
-            List<Resource> resourceList = resourcesMapper.queryResource(fullName,type.ordinal());
-            if (CollectionUtils.isEmpty(resourceList)) {
-                logger.error("resource file not exist,  resource full name {} ", fullName);
-                putMsg(result, Status.RESOURCE_NOT_EXIST);
-                return result;
-            }
-            putMsg(result, Status.SUCCESS);
-            result.setData(resourceList.get(0));
-        } else {
-            Resource resource = resourcesMapper.selectById(id);
-            if (resource == null) {
-                logger.error("resource file not exist,  resource id {}", id);
-                putMsg(result, Status.RESOURCE_NOT_EXIST);
-                return result;
-            }
-            Resource parentResource = resourcesMapper.selectById(resource.getPid());
-            if (parentResource == null) {
-                logger.error("parent resource file not exist,  resource id {}", id);
-                putMsg(result, Status.RESOURCE_NOT_EXIST);
-                return result;
-            }
-            putMsg(result, Status.SUCCESS);
-            result.setData(parentResource);
-        }
-        return result;
-    }
+    Result<Object> queryResource(String fullName,Integer id,ResourceType type);
 
     /**
      * view resource file online
@@ -863,64 +153,7 @@ public class ResourcesService extends BaseService {
      * @param limit limit
      * @return resource content
      */
-    public Result readResource(int resourceId, int skipLineNum, int limit) {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-
-        // get resource by id
-        Resource resource = resourcesMapper.selectById(resourceId);
-        if (resource == null) {
-            logger.error("resource file not exist,  resource id {}", resourceId);
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
-            return result;
-        }
-        //check preview or not by file suffix
-        String nameSuffix = FileUtils.suffix(resource.getAlias());
-        String resourceViewSuffixs = FileUtils.getResourceViewSuffixs();
-        if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
-            List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
-            if (!strList.contains(nameSuffix)) {
-                logger.error("resource suffix {} not support view,  resource id {}", nameSuffix, resourceId);
-                putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
-                return result;
-            }
-        }
-
-        String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
-            return  result;
-        }
-
-        // hdfs path
-        String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName());
-        logger.info("resource hdfs path is {} ", hdfsFileName);
-        try {
-            if(HadoopUtils.getInstance().exists(hdfsFileName)){
-                List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit);
-
-                putMsg(result, Status.SUCCESS);
-                Map<String, Object> map = new HashMap<>();
-                map.put(ALIAS, resource.getAlias());
-                map.put(CONTENT, String.join("\n", content));
-                result.setData(map);
-            }else{
-                logger.error("read file {} not exist in hdfs", hdfsFileName);
-                putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName);
-            }
-
-        } catch (Exception e) {
-            logger.error("Resource {} read failed", hdfsFileName, e);
-            putMsg(result, Status.HDFS_OPERATION_ERROR);
-        }
-
-        return result;
-    }
+    Result<Object> readResource(int resourceId, int skipLineNum, int limit);
 
     /**
      * create resource file online
@@ -933,73 +166,7 @@ public class ResourcesService extends BaseService {
      * @param content content
      * @return create result code
      */
-    @Transactional(rollbackFor = Exception.class)
-    public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) {
-        Result result = new Result();
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-
-        //check file suffix
-        String nameSuffix = fileSuffix.trim();
-        String resourceViewSuffixs = FileUtils.getResourceViewSuffixs();
-        if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
-            List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
-            if (!strList.contains(nameSuffix)) {
-                logger.error("resouce suffix {} not support create", nameSuffix);
-                putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
-                return result;
-            }
-        }
-
-        String name = fileName.trim() + "." + nameSuffix;
-        String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
-
-        result = verifyResourceName(fullName,type,loginUser);
-        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
-            return result;
-        }
-        if (pid != -1) {
-            Resource parentResource = resourcesMapper.selectById(pid);
-
-            if (parentResource == null) {
-                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
-                return result;
-            }
-
-            if (!hasPerm(loginUser, parentResource.getUserId())) {
-                putMsg(result, Status.USER_NO_OPERATION_PERM);
-                return result;
-            }
-        }
-
-        // save data
-        Date now = new Date();
-        Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now);
-
-        resourcesMapper.insert(resource);
-
-        putMsg(result, Status.SUCCESS);
-        Map<Object, Object> dataMap = new BeanMap(resource);
-        Map<String, Object> resultMap = new HashMap<>();
-        for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
-            if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
-                resultMap.put(entry.getKey().toString(), entry.getValue());
-            }
-        }
-        result.setData(resultMap);
-
-        String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
-
-        result = uploadContentToHdfs(fullName, tenantCode, content);
-        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
-            throw new RuntimeException(result.getMsg());
-        }
-        return result;
-    }
+    Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory);
 
     /**
      * updateProcessInstance resource
@@ -1008,145 +175,16 @@ public class ResourcesService extends BaseService {
      * @param content content
      * @return update result cod
      */
-    @Transactional(rollbackFor = Exception.class)
-    public Result updateResourceContent(int resourceId, String content) {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-
-        Resource resource = resourcesMapper.selectById(resourceId);
-        if (resource == null) {
-            logger.error("read file not exist,  resource id {}", resourceId);
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
-            return result;
-        }
-        //check can edit by file suffix
-        String nameSuffix = FileUtils.suffix(resource.getAlias());
-        String resourceViewSuffixs = FileUtils.getResourceViewSuffixs();
-        if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
-            List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
-            if (!strList.contains(nameSuffix)) {
-                logger.error("resource suffix {} not support updateProcessInstance,  resource id {}", nameSuffix, resourceId);
-                putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
-                return result;
-            }
-        }
-
-        String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
-            return  result;
-        }
-        resource.setSize(content.getBytes().length);
-        resource.setUpdateTime(new Date());
-        resourcesMapper.updateById(resource);
-
-
-        result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
-        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
-            throw new RuntimeException(result.getMsg());
-        }
-        return result;
-    }
-
-    /**
-     * @param resourceName  resource name
-     * @param tenantCode    tenant code
-     * @param content       content
-     * @return result
-     */
-    private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) {
-        Result result = new Result();
-        String localFilename = "";
-        String hdfsFileName = "";
-        try {
-            localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString());
-
-            if (!FileUtils.writeContent2File(content, localFilename)) {
-                // write file fail
-                logger.error("file {} fail, content is {}", localFilename, content);
-                putMsg(result, Status.RESOURCE_NOT_EXIST);
-                return result;
-            }
-
-            // get resource file hdfs path
-            hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName);
-            String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
-            logger.info("resource hdfs path is {} ", hdfsFileName);
-
-            HadoopUtils hadoopUtils = HadoopUtils.getInstance();
-            if (!hadoopUtils.exists(resourcePath)) {
-                // create if tenant dir not exists
-                createTenantDirIfNotExists(tenantCode);
-            }
-            if (hadoopUtils.exists(hdfsFileName)) {
-                hadoopUtils.delete(hdfsFileName, false);
-            }
-
-            hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            result.setCode(Status.HDFS_OPERATION_ERROR.getCode());
-            result.setMsg(String.format("copy %s to hdfs %s fail", localFilename, hdfsFileName));
-            return result;
-        }
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
+    Result<Object> updateResourceContent(int resourceId, String content);
 
     /**
      * download file
      *
      * @param resourceId resource id
      * @return resource content
-     * @throws Exception exception
+     * @throws IOException exception
      */
-    public org.springframework.core.io.Resource downloadResource(int resourceId) throws Exception {
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            throw new RuntimeException("hdfs not startup");
-        }
-
-        Resource resource = resourcesMapper.selectById(resourceId);
-        if (resource == null) {
-            logger.error("download file not exist,  resource id {}", resourceId);
-            return null;
-        }
-        if (resource.isDirectory()) {
-            logger.error("resource id {} is directory,can't download it", resourceId);
-            throw new RuntimeException("cant't download directory");
-        }
-
-        int userId = resource.getUserId();
-        User user = userMapper.selectById(userId);
-        if(user == null){
-            logger.error("user id {} not exists", userId);
-            throw new RuntimeException(String.format("resource owner id %d not exist",userId));
-        }
-
-        Tenant tenant = tenantMapper.queryById(user.getTenantId());
-        if(tenant == null){
-            logger.error("tenant id {} not exists", user.getTenantId());
-            throw new RuntimeException(String.format("The tenant id %d of resource owner not exist",user.getTenantId()));
-        }
-
-        String tenantCode = tenant.getTenantCode();
-
-        String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
-
-        String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
-        logger.info("resource hdfs path is {} ", hdfsFileName);
-
-        HadoopUtils.getInstance().copyHdfsToLocal(hdfsFileName, localFileName, false, true);
-        return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
-    }
-
+    org.springframework.core.io.Resource downloadResource(int resourceId) throws IOException;
 
     /**
      * list all file
@@ -1155,25 +193,7 @@ public class ResourcesService extends BaseService {
      * @param userId user id
      * @return unauthorized result code
      */
-    public Map<String, Object> authorizeResourceTree(User loginUser, Integer userId) {
-
-        Map<String, Object> result = new HashMap<>();
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-        List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
-        List<ResourceComponent> list;
-        if (CollectionUtils.isNotEmpty(resourceList)) {
-            Visitor visitor = new ResourceTreeVisitor(resourceList);
-            list = visitor.visit().getChildren();
-        } else {
-            list = new ArrayList<>(0);
-        }
-
-        result.put(Constants.DATA_LIST, list);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> authorizeResourceTree(User loginUser, Integer userId);
 
     /**
      * unauthorized file
@@ -1182,28 +202,7 @@ public class ResourcesService extends BaseService {
      * @param userId user id
      * @return unauthorized result code
      */
-    public Map<String, Object> unauthorizedFile(User loginUser, Integer userId) {
-
-        Map<String, Object> result = new HashMap<>();
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-        List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
-        List<Resource> list;
-        if (resourceList != null && resourceList.size() > 0) {
-            Set<Resource> resourceSet = new HashSet<>(resourceList);
-            List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);
-
-            getAuthorizedResourceList(resourceSet, authedResourceList);
-            list = new ArrayList<>(resourceSet);
-        } else {
-            list = new ArrayList<>(0);
-        }
-        Visitor visitor = new ResourceTreeVisitor(list);
-        result.put(Constants.DATA_LIST, visitor.visit().getChildren());
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> unauthorizedFile(User loginUser, Integer userId);
 
     /**
      * unauthorized udf function
@@ -1212,29 +211,7 @@ public class ResourcesService extends BaseService {
      * @param userId user id
      * @return unauthorized result code
      */
-    public Map<String, Object> unauthorizedUDFFunction(User loginUser, Integer userId) {
-        Map<String, Object> result = new HashMap<>(5);
-        //only admin can operate
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-
-        List<UdfFunc> udfFuncList = udfFunctionMapper.queryUdfFuncExceptUserId(userId);
-        List<UdfFunc> resultList = new ArrayList<>();
-        Set<UdfFunc> udfFuncSet = null;
-        if (CollectionUtils.isNotEmpty(udfFuncList)) {
-            udfFuncSet = new HashSet<>(udfFuncList);
-
-            List<UdfFunc> authedUDFFuncList = udfFunctionMapper.queryAuthedUdfFunc(userId);
-
-            getAuthorizedResourceList(udfFuncSet, authedUDFFuncList);
-            resultList = new ArrayList<>(udfFuncSet);
-        }
-        result.put(Constants.DATA_LIST, resultList);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
+    Map<String, Object> unauthorizedUDFFunction(User loginUser, Integer userId);
 
     /**
      * authorized udf function
@@ -1243,17 +220,7 @@ public class ResourcesService extends BaseService {
      * @param userId user id
      * @return authorized result code
      */
-    public Map<String, Object> authorizedUDFFunction(User loginUser, Integer userId) {
-        Map<String, Object> result = new HashMap<>();
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-        List<UdfFunc> udfFuncs = udfFunctionMapper.queryAuthedUdfFunc(userId);
-        result.put(Constants.DATA_LIST, udfFuncs);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
+    Map<String, Object> authorizedUDFFunction(User loginUser, Integer userId);
 
     /**
      * authorized file
@@ -1262,91 +229,6 @@ public class ResourcesService extends BaseService {
      * @param userId user id
      * @return authorized result
      */
-    public Map<String, Object> authorizedFile(User loginUser, Integer userId) {
-        Map<String, Object> result = new HashMap<>(5);
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-        List<Resource> authedResources = resourcesMapper.queryAuthorizedResourceList(userId);
-        Visitor visitor = new ResourceTreeVisitor(authedResources);
-        String visit = JSONUtils.toJsonString(visitor.visit(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
-        logger.info(visit);
-        String jsonTreeStr = JSONUtils.toJsonString(visitor.visit().getChildren(), SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
-        logger.info(jsonTreeStr);
-        result.put(Constants.DATA_LIST, visitor.visit().getChildren());
-        putMsg(result,Status.SUCCESS);
-        return result;
-    }
-
-    /**
-     * get authorized resource list
-     *
-     * @param resourceSet resource set
-     * @param authedResourceList authorized resource list
-     */
-    private void getAuthorizedResourceList(Set<?> resourceSet, List<?> authedResourceList) {
-        Set<?> authedResourceSet = null;
-        if (CollectionUtils.isNotEmpty(authedResourceList)) {
-            authedResourceSet = new HashSet<>(authedResourceList);
-            resourceSet.removeAll(authedResourceSet);
-        }
-    }
-
-    /**
-     * get tenantCode by UserId
-     *
-     * @param userId user id
-     * @param result return result
-     * @return
-     */
-    private String getTenantCode(int userId,Result result){
-
-        User user = userMapper.selectById(userId);
-        if (user == null) {
-            logger.error("user {} not exists", userId);
-            putMsg(result, Status.USER_NOT_EXIST,userId);
-            return null;
-        }
-
-        Tenant tenant = tenantMapper.queryById(user.getTenantId());
-        if (tenant == null){
-            logger.error("tenant not exists");
-            putMsg(result, Status.TENANT_NOT_EXIST);
-            return null;
-        }
-        return tenant.getTenantCode();
-    }
-
-    /**
-     * list all children id
-     * @param resource    resource
-     * @param containSelf whether add self to children list
-     * @return all children id
-     */
-    List<Integer> listAllChildren(Resource resource,boolean containSelf){
-        List<Integer> childList = new ArrayList<>();
-        if (resource.getId() != -1 && containSelf) {
-            childList.add(resource.getId());
-        }
-
-        if(resource.isDirectory()){
-            listAllChildren(resource.getId(),childList);
-        }
-        return childList;
-    }
-
-    /**
-     * list all children id
-     * @param resourceId    resource id
-     * @param childList     child list
-     */
-    void listAllChildren(int resourceId,List<Integer> childList){
-
-        List<Integer> children = resourcesMapper.listChildren(resourceId);
-        for(int chlidId:children){
-            childList.add(chlidId);
-            listAllChildren(chlidId,childList);
-        }
-    }
+    Map<String, Object> authorizedFile(User loginUser, Integer userId);
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
index 55880ad..18f3ebf 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
@@ -17,77 +17,18 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import org.apache.dolphinscheduler.api.dto.ScheduleParam;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.exceptions.ServiceException;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
-import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
-import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
-import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.quartz.CronExpression;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-
 /**
  * scheduler service
  */
-@Service
-public class SchedulerService extends BaseService {
-
-    private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class);
-
-    @Autowired
-    private ProjectService projectService;
-
-    @Autowired
-    private ExecutorService executorService;
-
-    @Autowired
-    private MonitorService monitorService;
-
-    @Autowired
-    private ProcessService processService;
-
-    @Autowired
-    private ScheduleMapper scheduleMapper;
-
-    @Autowired
-    private ProjectMapper projectMapper;
-
-    @Autowired
-    private ProcessDefinitionMapper processDefinitionMapper;
+public interface SchedulerService {
 
     /**
      * save schedule
@@ -103,80 +44,14 @@ public class SchedulerService extends BaseService {
      * @param workerGroup worker group
      * @return create result code
      */
-    @Transactional(rollbackFor = RuntimeException.class)
-    public Map<String, Object> insertSchedule(User loginUser, String projectName,
-                                              Integer processDefineId,
-                                              String schedule,
-                                              WarningType warningType,
-                                              int warningGroupId,
-                                              FailureStrategy failureStrategy,
-                                              Priority processInstancePriority,
-                                              String workerGroup) {
-
-        Map<String, Object> result = new HashMap();
-
-        Project project = projectMapper.queryByName(projectName);
-
-        // check project auth
-        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
-        if (!hasProjectAndPerm) {
-            return result;
-        }
-
-        // check work flow define release state
-        ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
-        result = executorService.checkProcessDefinitionValid(processDefinition, processDefineId);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
-
-        Schedule scheduleObj = new Schedule();
-        Date now = new Date();
-
-        scheduleObj.setProjectName(projectName);
-        scheduleObj.setProcessDefinitionId(processDefinition.getId());
-        scheduleObj.setProcessDefinitionName(processDefinition.getName());
-
-        ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
-        if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
-            logger.warn("The start time must not be the same as the end");
-            putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
-            return result;
-        }
-        scheduleObj.setStartTime(scheduleParam.getStartTime());
-        scheduleObj.setEndTime(scheduleParam.getEndTime());
-        if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
-            logger.error(scheduleParam.getCrontab() + " verify failure");
-
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
-            return result;
-        }
-        scheduleObj.setCrontab(scheduleParam.getCrontab());
-        scheduleObj.setWarningType(warningType);
-        scheduleObj.setWarningGroupId(warningGroupId);
-        scheduleObj.setFailureStrategy(failureStrategy);
-        scheduleObj.setCreateTime(now);
-        scheduleObj.setUpdateTime(now);
-        scheduleObj.setUserId(loginUser.getId());
-        scheduleObj.setUserName(loginUser.getUserName());
-        scheduleObj.setReleaseState(ReleaseState.OFFLINE);
-        scheduleObj.setProcessInstancePriority(processInstancePriority);
-        scheduleObj.setWorkerGroup(workerGroup);
-        scheduleMapper.insert(scheduleObj);
-
-        /**
-         * updateProcessInstance receivers and cc by process definition id
-         */
-        processDefinition.setWarningGroupId(warningGroupId);
-        processDefinitionMapper.updateById(processDefinition);
-
-        // return scheduler object with ID
-        result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId()));
-        putMsg(result, Status.SUCCESS);
-
-        result.put("scheduleId", scheduleObj.getId());
-        return result;
-    }
+    Map<String, Object> insertSchedule(User loginUser, String projectName,
+                                       Integer processDefineId,
+                                       String schedule,
+                                       WarningType warningType,
+                                       int warningGroupId,
+                                       FailureStrategy failureStrategy,
+                                       Priority processInstancePriority,
+                                       String workerGroup);
 
     /**
      * updateProcessInstance schedule
@@ -193,95 +68,16 @@ public class SchedulerService extends BaseService {
      * @param scheduleStatus schedule status
      * @return update result code
      */
-    @Transactional(rollbackFor = RuntimeException.class)
-    public Map<String, Object> updateSchedule(User loginUser,
-                                              String projectName,
-                                              Integer id,
-                                              String scheduleExpression,
-                                              WarningType warningType,
-                                              int warningGroupId,
-                                              FailureStrategy failureStrategy,
-                                              ReleaseState scheduleStatus,
-                                              Priority processInstancePriority,
-                                              String workerGroup) {
-        Map<String, Object> result = new HashMap<String, Object>(5);
-
-        Project project = projectMapper.queryByName(projectName);
-
-        // check project auth
-        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
-        if (!hasProjectAndPerm) {
-            return result;
-        }
-
-        // check schedule exists
-        Schedule schedule = scheduleMapper.selectById(id);
-
-        if (schedule == null) {
-            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
-            return result;
-        }
-
-        ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
-        if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId());
-            return result;
-        }
-
-        /**
-         * scheduling on-line status forbid modification
-         */
-        if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
-            return result;
-        }
-
-        Date now = new Date();
-
-        // updateProcessInstance param
-        if (StringUtils.isNotEmpty(scheduleExpression)) {
-            ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
-            if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
-                logger.warn("The start time must not be the same as the end");
-                putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
-                return result;
-            }
-            schedule.setStartTime(scheduleParam.getStartTime());
-            schedule.setEndTime(scheduleParam.getEndTime());
-            if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
-                putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
-                return result;
-            }
-            schedule.setCrontab(scheduleParam.getCrontab());
-        }
-
-        if (warningType != null) {
-            schedule.setWarningType(warningType);
-        }
-
-        schedule.setWarningGroupId(warningGroupId);
-
-        if (failureStrategy != null) {
-            schedule.setFailureStrategy(failureStrategy);
-        }
-
-        if (scheduleStatus != null) {
-            schedule.setReleaseState(scheduleStatus);
-        }
-        schedule.setWorkerGroup(workerGroup);
-        schedule.setUpdateTime(now);
-        schedule.setProcessInstancePriority(processInstancePriority);
-        scheduleMapper.updateById(schedule);
-
-        /**
-         * updateProcessInstance recipients and cc by process definition ID
-         */
-        processDefinition.setWarningGroupId(warningGroupId);
-
-        processDefinitionMapper.updateById(processDefinition);
-
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> updateSchedule(User loginUser,
+                                       String projectName,
+                                       Integer id,
+                                       String scheduleExpression,
+                                       WarningType warningType,
+                                       int warningGroupId,
+                                       FailureStrategy failureStrategy,
+                                       ReleaseState scheduleStatus,
+                                       Priority processInstancePriority,
+                                       String workerGroup);
 
 
     /**
@@ -293,110 +89,10 @@ public class SchedulerService extends BaseService {
      * @param scheduleStatus schedule status
      * @return publish result code
      */
-    @Transactional(rollbackFor = RuntimeException.class)
-    public Map<String, Object> setScheduleState(User loginUser,
-                                                String projectName,
-                                                Integer id,
-                                                ReleaseState scheduleStatus) {
-
-        Map<String, Object> result = new HashMap<String, Object>(5);
-
-        Project project = projectMapper.queryByName(projectName);
-        // check project auth
-        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
-        if (!hasProjectAndPerm) {
-            return result;
-        }
-
-        // check schedule exists
-        Schedule scheduleObj = scheduleMapper.selectById(id);
-
-        if (scheduleObj == null) {
-            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
-            return result;
-        }
-        // check schedule release state
-        if (scheduleObj.getReleaseState() == scheduleStatus) {
-            logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
-                    scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
-            putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
-            return result;
-        }
-        ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId());
-        if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId());
-            return result;
-        }
-
-        if (scheduleStatus == ReleaseState.ONLINE) {
-            // check process definition release state
-            if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
-                logger.info("not release process definition id: {} , name : {}",
-                        processDefinition.getId(), processDefinition.getName());
-                putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
-                return result;
-            }
-            // check sub process definition release state
-            List<Integer> subProcessDefineIds = new ArrayList<>();
-            processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
-            Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
-            if (subProcessDefineIds.size() > 0) {
-                List<ProcessDefinition> subProcessDefinitionList =
-                        processDefinitionMapper.queryDefinitionListByIdList(idArray);
-                if (subProcessDefinitionList != null && subProcessDefinitionList.size() > 0) {
-                    for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
-                        /**
-                         * if there is no online process, exit directly
-                         */
-                        if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
-                            logger.info("not release process definition id: {} , name : {}",
-                                    subProcessDefinition.getId(), subProcessDefinition.getName());
-                            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId());
-                            return result;
-                        }
-                    }
-                }
-            }
-        }
-
-        // check master server exists
-        List<Server> masterServers = monitorService.getServerListFromZK(true);
-
-        if (masterServers.size() == 0) {
-            putMsg(result, Status.MASTER_NOT_EXISTS);
-            return result;
-        }
-
-        // set status
-        scheduleObj.setReleaseState(scheduleStatus);
-
-        scheduleMapper.updateById(scheduleObj);
-
-        try {
-            switch (scheduleStatus) {
-                case ONLINE: {
-                    logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
-                    setSchedule(project.getId(), scheduleObj);
-                    break;
-                }
-                case OFFLINE: {
-                    logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
-                    deleteSchedule(project.getId(), id);
-                    break;
-                }
-                default: {
-                    putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
-                    return result;
-                }
-            }
-        } catch (Exception e) {
-            result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
-            throw new ServiceException(result.get(Constants.MSG).toString());
-        }
-
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> setScheduleState(User loginUser,
+                                         String projectName,
+                                         Integer id,
+                                         ReleaseState scheduleStatus);
 
     /**
      * query schedule
@@ -409,36 +105,7 @@ public class SchedulerService extends BaseService {
      * @param searchVal search value
      * @return schedule list page
      */
-    public Map<String, Object> querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize) {
-
-        HashMap<String, Object> result = new HashMap<>();
-
-        Project project = projectMapper.queryByName(projectName);
-
-        // check project auth
-        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
-        if (!hasProjectAndPerm) {
-            return result;
-        }
-
-        ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
-        if (processDefinition == null) {
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
-            return result;
-        }
-        Page<Schedule> page = new Page(pageNo, pageSize);
-        IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(
-                page, processDefineId, searchVal
-        );
-
-        PageInfo pageInfo = new PageInfo<Schedule>(pageNo, pageSize);
-        pageInfo.setTotalCount((int) scheduleIPage.getTotal());
-        pageInfo.setLists(scheduleIPage.getRecords());
-        result.put(Constants.DATA_LIST, pageInfo);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize);
 
     /**
      * query schedule list
@@ -447,41 +114,7 @@ public class SchedulerService extends BaseService {
      * @param projectName project name
      * @return schedule list
      */
-    public Map<String, Object> queryScheduleList(User loginUser, String projectName) {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        // check project auth
-        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
-        if (!hasProjectAndPerm) {
-            return result;
-        }
-
-        List<Schedule> schedules = scheduleMapper.querySchedulerListByProjectName(projectName);
-
-        result.put(Constants.DATA_LIST, schedules);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
-
-    public void setSchedule(int projectId, Schedule schedule) {
-
-        int scheduleId = schedule.getId();
-        logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
-
-        Date startDate = schedule.getStartTime();
-        Date endDate = schedule.getEndTime();
-
-        String jobName = QuartzExecutors.buildJobName(scheduleId);
-        String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
-
-        Map<String, Object> dataMap = QuartzExecutors.buildDataMap(projectId, scheduleId, schedule);
-
-        QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,
-                schedule.getCrontab(), dataMap);
-
-    }
+    Map<String, Object> queryScheduleList(User loginUser, String projectName);
 
     /**
      * delete schedule
@@ -490,35 +123,7 @@ public class SchedulerService extends BaseService {
      * @param scheduleId schedule id
      * @throws RuntimeException runtime exception
      */
-    public static void deleteSchedule(int projectId, int scheduleId) {
-        logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
-
-        String jobName = QuartzExecutors.buildJobName(scheduleId);
-        String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
-
-        if (!QuartzExecutors.getInstance().deleteJob(jobName, jobGroupName)) {
-            logger.warn("set offline failure:projectId:{},scheduleId:{}", projectId, scheduleId);
-            throw new ServiceException("set offline failure");
-        }
-
-    }
-
-    /**
-     * check valid
-     *
-     * @param result result
-     * @param bool bool
-     * @param status status
-     * @return check result code
-     */
-    private boolean checkValid(Map<String, Object> result, boolean bool, Status status) {
-        // timeout is valid
-        if (bool) {
-            putMsg(result, status);
-            return true;
-        }
-        return false;
-    }
+    void deleteSchedule(int projectId, int scheduleId);
 
     /**
      * delete schedule by id
@@ -528,46 +133,7 @@ public class SchedulerService extends BaseService {
      * @param scheduleId scheule id
      * @return delete result code
      */
-    public Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId) {
-
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-
-        Schedule schedule = scheduleMapper.selectById(scheduleId);
-
-        if (schedule == null) {
-            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
-            return result;
-        }
-
-        // Determine if the login user is the owner of the schedule
-        if (loginUser.getId() != schedule.getUserId()
-                && loginUser.getUserType() != UserType.ADMIN_USER) {
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
-        }
-
-        // check schedule is already online
-        if (schedule.getReleaseState() == ReleaseState.ONLINE) {
-            putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
-            return result;
-        }
-
-        int delete = scheduleMapper.deleteById(scheduleId);
-
-        if (delete > 0) {
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
-        }
-        return result;
-    }
+    Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId);
 
     /**
      * preview schedule
@@ -577,24 +143,5 @@ public class SchedulerService extends BaseService {
      * @param schedule schedule expression
      * @return the next five fire time
      */
-    public Map<String, Object> previewSchedule(User loginUser, String projectName, String schedule) {
-        Map<String, Object> result = new HashMap<>();
-        CronExpression cronExpression;
-        ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
-        Date now = new Date();
-
-        Date startTime = now.after(scheduleParam.getStartTime()) ? now : scheduleParam.getStartTime();
-        Date endTime = scheduleParam.getEndTime();
-        try {
-            cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab());
-        } catch (ParseException e) {
-            logger.error(e.getMessage(), e);
-            putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
-            return result;
-        }
-        List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
-        result.put(Constants.DATA_LIST, selfFireDateList.stream().map(t -> DateUtils.dateToString(t)));
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> previewSchedule(User loginUser, String projectName, String schedule);
 }
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
index 6c68202..cbbc89b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
@@ -17,57 +17,15 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.text.MessageFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 /**
  * task instance service
  */
-@Service
-public class TaskInstanceService extends BaseService {
-
-    @Autowired
-    ProjectMapper projectMapper;
-
-    @Autowired
-    ProjectService projectService;
-
-    @Autowired
-    ProcessService processService;
-
-    @Autowired
-    TaskInstanceMapper taskInstanceMapper;
-
-    @Autowired
-    ProcessInstanceService processInstanceService;
-
-    @Autowired
-    UsersService usersService;
+public interface TaskInstanceService {
 
     /**
      * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
@@ -85,65 +43,10 @@ public class TaskInstanceService extends BaseService {
      * @param pageSize page size
      * @return task list page
      */
-    public Map<String, Object> queryTaskListPaging(User loginUser, String projectName,
-                                                   Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate,
-                                                   String endDate, String searchVal, ExecutionStatus stateType, String host,
-                                                   Integer pageNo, Integer pageSize) {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status status = (Status) checkResult.get(Constants.STATUS);
-        if (status != Status.SUCCESS) {
-            return checkResult;
-        }
-
-        int[] statusArray = null;
-        if (stateType != null) {
-            statusArray = new int[]{stateType.ordinal()};
-        }
-
-        Date start = null;
-        Date end = null;
-        if (StringUtils.isNotEmpty(startDate)) {
-            start = DateUtils.getScheduleDate(startDate);
-            if (start == null) {
-                return generateInvalidParamRes(result, "startDate");
-            }
-        }
-        if (StringUtils.isNotEmpty(endDate)) {
-            end = DateUtils.getScheduleDate(endDate);
-            if (end == null) {
-                return generateInvalidParamRes(result, "endDate");
-            }
-        }
-
-        Page<TaskInstance> page = new Page(pageNo, pageSize);
-        PageInfo pageInfo = new PageInfo<TaskInstance>(pageNo, pageSize);
-        int executorId = usersService.getUserIdByName(executorName);
-
-        IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
-                page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end
-        );
-        Set<String> exclusionSet = new HashSet<>();
-        exclusionSet.add(Constants.CLASS);
-        exclusionSet.add("taskJson");
-        List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords();
-
-        for (TaskInstance taskInstance : taskInstanceList) {
-            taskInstance.setDuration(DateUtils.format2Duration(taskInstance.getStartTime(), taskInstance.getEndTime()));
-            User executor = usersService.queryUser(taskInstance.getExecutorId());
-            if (null != executor) {
-                taskInstance.setExecutorName(executor.getUserName());
-            }
-        }
-        pageInfo.setTotalCount((int) taskInstanceIPage.getTotal());
-        pageInfo.setLists(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet));
-        result.put(Constants.DATA_LIST, pageInfo);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> queryTaskListPaging(User loginUser, String projectName,
+                                            Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate,
+                                            String endDate, String searchVal, ExecutionStatus stateType, String host,
+                                            Integer pageNo, Integer pageSize);
 
     /**
      * change one task instance's state from failure to forced success
@@ -153,51 +56,6 @@ public class TaskInstanceService extends BaseService {
      * @param taskInstanceId task instance id
      * @return the result code and msg
      */
-    public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
-        Map<String, Object> result = new HashMap<>(5);
-        Project project = projectMapper.queryByName(projectName);
-
-        // check user auth
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status status = (Status) checkResult.get(Constants.STATUS);
-        if (status != Status.SUCCESS) {
-            return checkResult;
-        }
+    Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId);
 
-        // check whether the task instance can be found
-        TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
-        if (task == null) {
-            putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
-            return result;
-        }
-
-        // check whether the task instance state type is failure
-        if (!task.getState().typeIsFailure()) {
-            putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
-            return result;
-        }
-
-        // change the state of the task instance
-        task.setState(ExecutionStatus.FORCED_SUCCESS);
-        int changedNum = taskInstanceMapper.updateById(task);
-        if (changedNum > 0) {
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
-        }
-
-        return result;
-    }
-
-    /***
-     * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with  param name
-     * @param result exist result map
-     * @param params invalid params name
-     * @return update result map
-     */
-    private Map<String, Object> generateInvalidParamRes(Map<String, Object> result, String params) {
-        result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
-        result.put(Constants.MSG, MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), params));
-        return result;
-    }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
index da85621..ba0d32e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
@@ -61,7 +61,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
      * @return token list for page number and page size
      */
     public Map<String, Object> queryAccessTokenList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         PageInfo<AccessToken> pageInfo = new PageInfo<>(pageNo, pageSize);
         Page<AccessToken> page = new Page<>(pageNo, pageSize);
@@ -87,7 +87,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
      * @return create result code
      */
     public Map<String, Object> createToken(User loginUser, int userId, String expireTime, String token) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         if (!hasPerm(loginUser,userId)){
             putMsg(result, Status.USER_NO_OPERATION_PERM);
@@ -124,7 +124,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
      * @return token string
      */
     public Map<String, Object> generateToken(User loginUser, int userId, String expireTime) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         if (!hasPerm(loginUser,userId)){
             putMsg(result, Status.USER_NO_OPERATION_PERM);
             return result;
@@ -143,7 +143,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
      * @return delete result code
      */
     public Map<String, Object> delAccessTokenById(User loginUser, int id) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         AccessToken accessToken = accessTokenMapper.selectById(id);
 
@@ -174,7 +174,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
      * @return update result code
      */
     public Map<String, Object> updateToken(User loginUser, int id, int userId, String expireTime, String token) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         if (!hasPerm(loginUser,userId)){
             putMsg(result, Status.USER_NO_OPERATION_PERM);
             return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
index 7254fc1..b84c279 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
@@ -130,7 +130,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
 
     private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate
             , TriFunction<Date, Date, Integer[], List<ExecuteStatusCount>> instanceStateCounter) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         boolean checkProject = checkProject(loginUser, projectId, result);
         if (!checkProject) {
             return result;
@@ -193,7 +193,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
      */
     public Map<String, Object> countCommandState(User loginUser, int projectId, String startDate, String endDate) {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         boolean checkProject = checkProject(loginUser, projectId, result);
         if (!checkProject) {
             return result;
@@ -264,7 +264,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
      * @return queue state count data
      */
     public Map<String, Object> countQueueState(User loginUser, int projectId) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         boolean checkProject = checkProject(loginUser, projectId, result);
         if (!checkProject) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 2a2ae78..3a92bef 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -146,6 +146,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private SchedulerService schedulerService;
+
     /**
      * create process definition
      *
@@ -273,7 +276,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     @Override
     public Map<String, Object> queryProcessDefinitionList(User loginUser, String projectName) {
 
-        HashMap<String, Object> result = new HashMap<>(5);
+        HashMap<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByName(projectName);
 
         Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@@ -399,7 +402,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
                                                        String desc,
                                                        String locations,
                                                        String connects) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         Project project = projectMapper.queryByName(projectName);
         Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@@ -514,7 +517,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByName(projectName);
 
         Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@@ -634,7 +637,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
                     // set status
                     schedule.setReleaseState(ReleaseState.OFFLINE);
                     scheduleMapper.updateById(schedule);
-                    SchedulerService.deleteSchedule(project.getId(), schedule.getId());
+                    schedulerService.deleteSchedule(project.getId(), schedule.getId());
                 }
                 break;
             default:
@@ -823,7 +826,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     @Override
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         String processMetaJson = FileUtils.file2String(file);
         List<ProcessMeta> processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class);
 
@@ -992,7 +995,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
         }
 
         //recursive sub-process parameter correction map key for old process id value for new process id
-        Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
+        Map<Integer, Integer> subProcessIdMap = new HashMap<>();
 
         List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements())
                 .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText()))
@@ -1283,7 +1286,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
     @Override
     public Map<String, Object> queryProcessDefinitionAllByProjectId(Integer projectId) {
 
-        HashMap<String, Object> result = new HashMap<>(5);
+        HashMap<String, Object> result = new HashMap<>();
 
         List<ProcessDefinition> resourceList = processDefineMapper.queryAllDefinitionList(projectId);
         result.put(Constants.DATA_LIST, resourceList);
@@ -1494,7 +1497,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
                                                       Integer processId,
                                                       Project targetProject) throws JsonProcessingException {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
         if (processDefinition == null) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
similarity index 79%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index e7d8906..bcc37cb 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -14,27 +14,62 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api.service;
 
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import org.apache.commons.collections.BeanMap;
+package org.apache.dolphinscheduler.api.service.impl;
+
+import static org.apache.dolphinscheduler.common.Constants.ALIAS;
+import static org.apache.dolphinscheduler.common.Constants.CONTENT;
+import static org.apache.dolphinscheduler.common.Constants.JAR;
+
 import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
 import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
 import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
 import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ResourcesService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.RegexUtils;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
+import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
+
+import org.apache.commons.beanutils.BeanMap;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,21 +78,17 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.multipart.MultipartFile;
 
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.stream.Collectors;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.databind.SerializationFeature;
 
 /**
- * resources service
+ * resources service impl
  */
 @Service
-public class ResourcesService extends BaseService {
+public class ResourcesServiceImpl extends BaseService implements ResourcesService {
 
-    private static final Logger logger = LoggerFactory.getLogger(ResourcesService.class);
+    private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceImpl.class);
 
     @Autowired
     private ResourceMapper resourcesMapper;
@@ -89,38 +120,21 @@ public class ResourcesService extends BaseService {
      * @return create directory result
      */
     @Transactional(rollbackFor = Exception.class)
-    public Result createDirectory(User loginUser,
-                                 String name,
-                                 String description,
-                                 ResourceType type,
-                                 int pid,
-                                 String currentDir) {
-        Result result = new Result();
-        // if hdfs not startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
+    public Result<Object> createDirectory(User loginUser,
+                                          String name,
+                                          String description,
+                                          ResourceType type,
+                                          int pid,
+                                          String currentDir) {
+        Result<Object> result = checkResourceUploadStartupState();
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
-        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
-        result = verifyResourceName(fullName,type,loginUser);
+        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name);
+        result = verifyResource(loginUser, type, fullName, pid);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
-        if (pid != -1) {
-            Resource parentResource = resourcesMapper.selectById(pid);
-
-            if (parentResource == null) {
-                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
-                return result;
-            }
-
-            if (!hasPerm(loginUser, parentResource.getUserId())) {
-                putMsg(result, Status.USER_NO_OPERATION_PERM);
-                return result;
-            }
-        }
-
 
         if (checkResourceExists(fullName, 0, type.ordinal())) {
             logger.error("resource directory {} has exist, can't recreate", fullName);
@@ -134,10 +148,9 @@ public class ResourcesService extends BaseService {
 
         try {
             resourcesMapper.insert(resource);
-
             putMsg(result, Status.SUCCESS);
             Map<Object, Object> dataMap = new BeanMap(resource);
-            Map<String, Object> resultMap = new HashMap<String, Object>();
+            Map<String, Object> resultMap = new HashMap<>();
             for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
                 if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
                     resultMap.put(entry.getKey().toString(), entry.getValue());
@@ -150,10 +163,10 @@ public class ResourcesService extends BaseService {
             return result;
         } catch (Exception e) {
             logger.error("resource already exists, can't recreate ", e);
-            throw new RuntimeException("resource already exists, can't recreate");
+            throw new ServiceException("resource already exists, can't recreate");
         }
         //create directory in hdfs
-        createDirecotry(loginUser,fullName,type,result);
+        createDirectory(loginUser,fullName,type,result);
         return result;
     }
 
@@ -170,73 +183,32 @@ public class ResourcesService extends BaseService {
      * @return create result code
      */
     @Transactional(rollbackFor = Exception.class)
-    public Result createResource(User loginUser,
-                                 String name,
-                                 String desc,
-                                 ResourceType type,
-                                 MultipartFile file,
-                                 int pid,
-                                 String currentDir) {
-        Result result = new Result();
-
-        // if hdfs not startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
-            return result;
-        }
-
-        if (pid != -1) {
-            Resource parentResource = resourcesMapper.selectById(pid);
-
-            if (parentResource == null) {
-                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
-                return result;
-            }
-
-            if (!hasPerm(loginUser, parentResource.getUserId())) {
-                putMsg(result, Status.USER_NO_OPERATION_PERM);
-                return result;
-            }
-        }
-
-        // file is empty
-        if (file.isEmpty()) {
-            logger.error("file is empty: {}", file.getOriginalFilename());
-            putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
+    public Result<Object> createResource(User loginUser,
+                                         String name,
+                                         String desc,
+                                         ResourceType type,
+                                         MultipartFile file,
+                                         int pid,
+                                         String currentDir) {
+        Result<Object> result = checkResourceUploadStartupState();
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
-        // file suffix
-        String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
-        String nameSuffix = FileUtils.suffix(name);
-
-        // determine file suffix
-        if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
-            /**
-             * rename file suffix and original suffix must be consistent
-             */
-            logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
-            putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
+        result = verifyPid(loginUser, pid);
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
-        //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
-        if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) {
-            logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
-            putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
-            return result;
-        }
-        if (file.getSize() > Constants.MAX_FILE_SIZE) {
-            logger.error("file size is too large: {}", file.getOriginalFilename());
-            putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
+        result = verifyFile(name, type, file);
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
-        // check resoure name exists
-        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
+        // check resource name exists
+        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name);
         if (checkResourceExists(fullName, 0, type.ordinal())) {
-            logger.error("resource {} has exist, can't recreate", name);
+            logger.error("resource {} has exist, can't recreate", RegexUtils.escapeNRT(name));
             putMsg(result, Status.RESOURCE_EXIST);
             return result;
         }
@@ -246,7 +218,6 @@ public class ResourcesService extends BaseService {
 
         try {
             resourcesMapper.insert(resource);
-
             putMsg(result, Status.SUCCESS);
             Map<Object, Object> dataMap = new BeanMap(resource);
             Map<String, Object> resultMap = new HashMap<>();
@@ -258,14 +229,14 @@ public class ResourcesService extends BaseService {
             result.setData(resultMap);
         } catch (Exception e) {
             logger.error("resource already exists, can't recreate ", e);
-            throw new RuntimeException("resource already exists, can't recreate");
+            throw new ServiceException("resource already exists, can't recreate");
         }
 
         // fail upload
         if (!upload(loginUser, fullName, file, type)) {
-            logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
+            logger.error("upload resource: {} file: {} failed.", RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(file.getOriginalFilename()));
             putMsg(result, Status.HDFS_OPERATION_ERROR);
-            throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
+            throw new ServiceException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
         }
         return result;
     }
@@ -278,13 +249,11 @@ public class ResourcesService extends BaseService {
      * @param type      type
      * @return true if resource exists
      */
-    private boolean checkResourceExists(String fullName, int userId, int type ){
-
+    private boolean checkResourceExists(String fullName, int userId, int type) {
         List<Resource> resources = resourcesMapper.queryResourceList(fullName, userId, type);
-        return resources != null && resources.size() > 0;
+        return resources != null && !resources.isEmpty();
     }
 
-
     /**
      * update resource
      * @param loginUser     login user
@@ -296,18 +265,14 @@ public class ResourcesService extends BaseService {
      * @return  update result code
      */
     @Transactional(rollbackFor = Exception.class)
-    public Result updateResource(User loginUser,
-                                 int resourceId,
-                                 String name,
-                                 String desc,
-                                 ResourceType type,
-                                 MultipartFile file) {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
+    public Result<Object> updateResource(User loginUser,
+                                         int resourceId,
+                                         String name,
+                                         String desc,
+                                         ResourceType type,
+                                         MultipartFile file) {
+        Result<Object> result = checkResourceUploadStartupState();
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
@@ -326,56 +291,25 @@ public class ResourcesService extends BaseService {
             return result;
         }
 
-        //check resource aleady exists
+        //check resource already exists
         String originFullName = resource.getFullName();
         String originResourceName = resource.getAlias();
 
-        String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name);
+        String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/") + 1),name);
         if (!originResourceName.equals(name) && checkResourceExists(fullName, 0, type.ordinal())) {
             logger.error("resource {} already exists, can't recreate", name);
             putMsg(result, Status.RESOURCE_EXIST);
             return result;
         }
 
-        if (file != null) {
-
-            // file is empty
-            if (file.isEmpty()) {
-                logger.error("file is empty: {}", file.getOriginalFilename());
-                putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
-                return result;
-            }
-
-            // file suffix
-            String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
-            String nameSuffix = FileUtils.suffix(name);
-
-            // determine file suffix
-            if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
-                /**
-                 * rename file suffix and original suffix must be consistent
-                 */
-                logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename());
-                putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
-                return result;
-            }
-
-            //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
-            if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) {
-                logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
-                putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
-                return result;
-            }
-            if (file.getSize() > Constants.MAX_FILE_SIZE) {
-                logger.error("file size is too large: {}", file.getOriginalFilename());
-                putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
-                return result;
-            }
+        result = verifyFile(name, type, file);
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
+            return result;
         }
 
         // query tenant by user id
         String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
+        if (StringUtils.isEmpty(tenantCode)) {
             return result;
         }
         // verify whether the resource exists in storage
@@ -439,7 +373,7 @@ public class ResourcesService extends BaseService {
                 List<Integer> childrenResource = listAllChildren(resource,false);
                 if (CollectionUtils.isNotEmpty(childrenResource)) {
                     String matcherFullName = Matcher.quoteReplacement(fullName);
-                    List<Resource> childResourceList = new ArrayList<>();
+                    List<Resource> childResourceList;
                     Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]);
                     List<Resource> resourceList = resourcesMapper.listResourceByIds(childResIdArray);
                     childResourceList = resourceList.stream().map(t -> {
@@ -476,7 +410,7 @@ public class ResourcesService extends BaseService {
 
             putMsg(result, Status.SUCCESS);
             Map<Object, Object> dataMap = new BeanMap(resource);
-            Map<String, Object> resultMap = new HashMap<>(5);
+            Map<String, Object> resultMap = new HashMap<>();
             for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
                 if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
                     resultMap.put(entry.getKey().toString(), entry.getValue());
@@ -496,26 +430,24 @@ public class ResourcesService extends BaseService {
         if (file != null) {
             // fail upload
             if (!upload(loginUser, fullName, file, type)) {
-                logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
+                logger.error("upload resource: {} file: {} failed.", name, RegexUtils.escapeNRT(file.getOriginalFilename()));
                 putMsg(result, Status.HDFS_OPERATION_ERROR);
-                throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
+                throw new ServiceException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
             }
             if (!fullName.equals(originFullName)) {
                 try {
                     HadoopUtils.getInstance().delete(originHdfsFileName,false);
                 } catch (IOException e) {
                     logger.error(e.getMessage(),e);
-                    throw new RuntimeException(String.format("delete resource: %s failed.", originFullName));
+                    throw new ServiceException(String.format("delete resource: %s failed.", originFullName));
                 }
             }
             return result;
         }
 
-
         // get the path of dest file in hdfs
         String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
 
-
         try {
             logger.info("start hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName);
             HadoopUtils.getInstance().copy(originHdfsFileName, destHdfsFileName, true, true);
@@ -526,7 +458,44 @@ public class ResourcesService extends BaseService {
         }
 
         return result;
+    }
+
+    private Result<Object> verifyFile(String name, ResourceType type, MultipartFile file) {
+        Result<Object> result = new Result<>();
+        putMsg(result, Status.SUCCESS);
+        if (file != null) {
+            // file is empty
+            if (file.isEmpty()) {
+                logger.error("file is empty: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+                putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
+                return result;
+            }
+
+            // file suffix
+            String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
+            String nameSuffix = FileUtils.suffix(name);
+
+            // determine file suffix
+            if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
+                // rename file suffix and original suffix must be consistent
+                logger.error("rename file suffix and original suffix must be consistent: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+                putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
+                return result;
+            }
 
+            //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
+            if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) {
+                logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
+                putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
+                return result;
+            }
+            if (file.getSize() > Constants.MAX_FILE_SIZE) {
+                logger.error("file size is too large: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+                putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
+                return result;
+            }
+        }
+        return result;
     }
 
     /**
@@ -539,16 +508,16 @@ public class ResourcesService extends BaseService {
      * @param pageSize page size
      * @return resource list page
      */
-    public Map<String, Object> queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
+    public Map<String, Object> queryResourceListPaging(User loginUser, int directoryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
 
-        HashMap<String, Object> result = new HashMap<>(5);
-        Page<Resource> page = new Page(pageNo, pageSize);
+        HashMap<String, Object> result = new HashMap<>();
+        Page<Resource> page = new Page<>(pageNo, pageSize);
         int userId = loginUser.getId();
         if (isAdmin(loginUser)) {
-            userId= 0;
+            userId = 0;
         }
-        if (direcotryId != -1) {
-            Resource directory = resourcesMapper.selectById(direcotryId);
+        if (directoryId != -1) {
+            Resource directory = resourcesMapper.selectById(directoryId);
             if (directory == null) {
                 putMsg(result, Status.RESOURCE_NOT_EXIST);
                 return result;
@@ -556,8 +525,8 @@ public class ResourcesService extends BaseService {
         }
 
         IPage<Resource> resourceIPage = resourcesMapper.queryResourcePaging(page,
-                userId,direcotryId, type.ordinal(), searchVal);
-        PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
+                userId,directoryId, type.ordinal(), searchVal);
+        PageInfo<Resource> pageInfo = new PageInfo<>(pageNo, pageSize);
         pageInfo.setTotalCount((int)resourceIPage.getTotal());
         pageInfo.setLists(resourceIPage.getRecords());
         result.put(Constants.DATA_LIST, pageInfo);
@@ -566,14 +535,13 @@ public class ResourcesService extends BaseService {
     }
 
     /**
-     * create direcoty
+     * create directory
      * @param loginUser login user
      * @param fullName  full name
      * @param type      resource type
      * @param result    Result
      */
-    private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){
-        // query tenant
+    private void createDirectory(User loginUser,String fullName,ResourceType type,Result<Object> result) {
         String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
         String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
         String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode);
@@ -585,12 +553,12 @@ public class ResourcesService extends BaseService {
             if (!HadoopUtils.getInstance().mkdir(directoryName)) {
                 logger.error("create resource directory {} of hdfs failed",directoryName);
                 putMsg(result,Status.HDFS_OPERATION_ERROR);
-                throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
+                throw new ServiceException(String.format("create resource directory: %s failed.", directoryName));
             }
         } catch (Exception e) {
             logger.error("create resource directory {} of hdfs failed",directoryName);
             putMsg(result,Status.HDFS_OPERATION_ERROR);
-            throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
+            throw new ServiceException(String.format("create resource directory: %s failed.", directoryName));
         }
     }
 
@@ -615,7 +583,6 @@ public class ResourcesService extends BaseService {
         // random file name
         String localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString());
 
-
         // save file to hdfs, and delete original file
         String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
         String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode);
@@ -641,11 +608,10 @@ public class ResourcesService extends BaseService {
      * @return resource list
      */
     public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
-
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         int userId = loginUser.getId();
-        if(isAdmin(loginUser)){
+        if (isAdmin(loginUser)) {
             userId = 0;
         }
         List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
@@ -664,22 +630,21 @@ public class ResourcesService extends BaseService {
      * @return resource list
      */
     public Map<String, Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) {
-
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         String suffix = ".jar";
         int userId = loginUser.getId();
-        if(isAdmin(loginUser)){
+        if (isAdmin(loginUser)) {
             userId = 0;
         }
         if (programType != null) {
             switch (programType) {
                 case JAVA:
-                    break;
                 case SCALA:
                     break;
                 case PYTHON:
                     suffix = ".py";
                     break;
+                default:
             }
         }
         List<Resource> allResourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal(),0);
@@ -697,23 +662,18 @@ public class ResourcesService extends BaseService {
      * @param loginUser login user
      * @param resourceId resource id
      * @return delete result code
-     * @throws Exception exception
+     * @throws IOException exception
      */
     @Transactional(rollbackFor = Exception.class)
-    public Result delete(User loginUser, int resourceId) throws Exception {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
+    public Result<Object> delete(User loginUser, int resourceId) throws IOException {
+        Result<Object> result = checkResourceUploadStartupState();
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
-        //get resource and  hdfs path
+        // get resource by id
         Resource resource = resourcesMapper.selectById(resourceId);
         if (resource == null) {
-            logger.error("resource file not exist,  resource id {}", resourceId);
             putMsg(result, Status.RESOURCE_NOT_EXIST);
             return result;
         }
@@ -723,7 +683,7 @@ public class ResourcesService extends BaseService {
         }
 
         String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
+        if (StringUtils.isEmpty(tenantCode)) {
             return  result;
         }
 
@@ -735,11 +695,11 @@ public class ResourcesService extends BaseService {
         List<Integer> allChildren = listAllChildren(resource,true);
         Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]);
 
-        //if resource type is UDF,need check whether it is bound by UDF functon
+        //if resource type is UDF,need check whether it is bound by UDF function
         if (resource.getType() == (ResourceType.UDF)) {
             List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray);
             if (CollectionUtils.isNotEmpty(udfFuncs)) {
-                logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
+                logger.error("can't be deleted,because it is bound by UDF functions:{}", udfFuncs);
                 putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
                 return result;
             }
@@ -781,22 +741,22 @@ public class ResourcesService extends BaseService {
      * @param type      resource type
      * @return true if the resource name not exists, otherwise return false
      */
-    public Result verifyResourceName(String fullName, ResourceType type,User loginUser) {
-        Result result = new Result();
+    public Result<Object> verifyResourceName(String fullName, ResourceType type, User loginUser) {
+        Result<Object> result = new Result<>();
         putMsg(result, Status.SUCCESS);
         if (checkResourceExists(fullName, 0, type.ordinal())) {
-            logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName);
+            logger.error("resource type:{} name:{} has exist, can't create again.", type, RegexUtils.escapeNRT(fullName));
             putMsg(result, Status.RESOURCE_EXIST);
         } else {
             // query tenant
             Tenant tenant = tenantMapper.queryById(loginUser.getTenantId());
-            if(tenant != null){
+            if (tenant != null) {
                 String tenantCode = tenant.getTenantCode();
 
                 try {
                     String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
-                    if(HadoopUtils.getInstance().exists(hdfsFilename)){
-                        logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename);
+                    if (HadoopUtils.getInstance().exists(hdfsFilename)) {
+                        logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, RegexUtils.escapeNRT(fullName), hdfsFilename);
                         putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
                     }
 
@@ -804,12 +764,11 @@ public class ResourcesService extends BaseService {
                     logger.error(e.getMessage(),e);
                     putMsg(result,Status.HDFS_OPERATION_ERROR);
                 }
-            }else{
+            } else {
                 putMsg(result,Status.TENANT_NOT_EXIST);
             }
         }
 
-
         return result;
     }
 
@@ -820,17 +779,15 @@ public class ResourcesService extends BaseService {
      * @param type      resource type
      * @return true if the resource full name or pid not exists, otherwise return false
      */
-    public Result queryResource(String fullName,Integer id,ResourceType type) {
-        Result result = new Result();
+    public Result<Object> queryResource(String fullName,Integer id,ResourceType type) {
+        Result<Object> result = new Result<>();
         if (StringUtils.isBlank(fullName) && id == null) {
-            logger.error("You must input one of fullName and pid");
             putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
             return result;
         }
         if (StringUtils.isNotBlank(fullName)) {
             List<Resource> resourceList = resourcesMapper.queryResource(fullName,type.ordinal());
             if (CollectionUtils.isEmpty(resourceList)) {
-                logger.error("resource file not exist,  resource full name {} ", fullName);
                 putMsg(result, Status.RESOURCE_NOT_EXIST);
                 return result;
             }
@@ -839,13 +796,11 @@ public class ResourcesService extends BaseService {
         } else {
             Resource resource = resourcesMapper.selectById(id);
             if (resource == null) {
-                logger.error("resource file not exist,  resource id {}", id);
                 putMsg(result, Status.RESOURCE_NOT_EXIST);
                 return result;
             }
             Resource parentResource = resourcesMapper.selectById(resource.getPid());
             if (parentResource == null) {
-                logger.error("parent resource file not exist,  resource id {}", id);
                 putMsg(result, Status.RESOURCE_NOT_EXIST);
                 return result;
             }
@@ -863,20 +818,15 @@ public class ResourcesService extends BaseService {
      * @param limit limit
      * @return resource content
      */
-    public Result readResource(int resourceId, int skipLineNum, int limit) {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
+    public Result<Object> readResource(int resourceId, int skipLineNum, int limit) {
+        Result<Object> result = checkResourceUploadStartupState();
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
         // get resource by id
         Resource resource = resourcesMapper.selectById(resourceId);
         if (resource == null) {
-            logger.error("resource file not exist,  resource id {}", resourceId);
             putMsg(result, Status.RESOURCE_NOT_EXIST);
             return result;
         }
@@ -893,15 +843,15 @@ public class ResourcesService extends BaseService {
         }
 
         String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
+        if (StringUtils.isEmpty(tenantCode)) {
             return  result;
         }
 
         // hdfs path
         String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName());
-        logger.info("resource hdfs path is {} ", hdfsFileName);
+        logger.info("resource hdfs path is {}", hdfsFileName);
         try {
-            if(HadoopUtils.getInstance().exists(hdfsFileName)){
+            if (HadoopUtils.getInstance().exists(hdfsFileName)) {
                 List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit);
 
                 putMsg(result, Status.SUCCESS);
@@ -909,7 +859,7 @@ public class ResourcesService extends BaseService {
                 map.put(ALIAS, resource.getAlias());
                 map.put(CONTENT, String.join("\n", content));
                 result.setData(map);
-            }else{
+            } else {
                 logger.error("read file {} not exist in hdfs", hdfsFileName);
                 putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName);
             }
@@ -931,15 +881,14 @@ public class ResourcesService extends BaseService {
      * @param fileSuffix file suffix
      * @param desc description
      * @param content content
+     * @param pid pid
+     * @param currentDir current directory
      * @return create result code
      */
     @Transactional(rollbackFor = Exception.class)
-    public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) {
-        Result result = new Result();
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
+    public Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDir) {
+        Result<Object> result = checkResourceUploadStartupState();
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
@@ -949,32 +898,18 @@ public class ResourcesService extends BaseService {
         if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
             List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
             if (!strList.contains(nameSuffix)) {
-                logger.error("resouce suffix {} not support create", nameSuffix);
+                logger.error("resource suffix {} not support create", nameSuffix);
                 putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
                 return result;
             }
         }
 
         String name = fileName.trim() + "." + nameSuffix;
-        String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
-
-        result = verifyResourceName(fullName,type,loginUser);
+        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name) : String.format("%s/%s",currentDir,name);
+        result = verifyResource(loginUser, type, fullName, pid);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
-        if (pid != -1) {
-            Resource parentResource = resourcesMapper.selectById(pid);
-
-            if (parentResource == null) {
-                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
-                return result;
-            }
-
-            if (!hasPerm(loginUser, parentResource.getUserId())) {
-                putMsg(result, Status.USER_NO_OPERATION_PERM);
-                return result;
-            }
-        }
 
         // save data
         Date now = new Date();
@@ -996,7 +931,44 @@ public class ResourcesService extends BaseService {
 
         result = uploadContentToHdfs(fullName, tenantCode, content);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
-            throw new RuntimeException(result.getMsg());
+            throw new ServiceException(result.getMsg());
+        }
+        return result;
+    }
+
+    private Result<Object> checkResourceUploadStartupState() {
+        Result<Object> result = new Result<>();
+        putMsg(result, Status.SUCCESS);
+        // if resource upload startup
+        if (!PropertyUtils.getResUploadStartupState()) {
+            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
+            putMsg(result, Status.HDFS_NOT_STARTUP);
+            return result;
+        }
+        return result;
+    }
+
+    private Result<Object> verifyResource(User loginUser, ResourceType type, String fullName, int pid) {
+        Result<Object> result = verifyResourceName(fullName, type, loginUser);
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
+            return result;
+        }
+        return verifyPid(loginUser, pid);
+    }
+
+    private Result<Object> verifyPid(User loginUser, int pid) {
+        Result<Object> result = new Result<>();
+        putMsg(result, Status.SUCCESS);
+        if (pid != -1) {
+            Resource parentResource = resourcesMapper.selectById(pid);
+            if (parentResource == null) {
+                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
+                return result;
+            }
+            if (!hasPerm(loginUser, parentResource.getUserId())) {
+                putMsg(result, Status.USER_NO_OPERATION_PERM);
+                return result;
+            }
         }
         return result;
     }
@@ -1009,13 +981,9 @@ public class ResourcesService extends BaseService {
      * @return update result cod
      */
     @Transactional(rollbackFor = Exception.class)
-    public Result updateResourceContent(int resourceId, String content) {
-        Result result = new Result();
-
-        // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
-            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            putMsg(result, Status.HDFS_NOT_STARTUP);
+    public Result<Object> updateResourceContent(int resourceId, String content) {
+        Result<Object> result = checkResourceUploadStartupState();
+        if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
@@ -1038,17 +1006,16 @@ public class ResourcesService extends BaseService {
         }
 
         String tenantCode = getTenantCode(resource.getUserId(),result);
-        if (StringUtils.isEmpty(tenantCode)){
+        if (StringUtils.isEmpty(tenantCode)) {
             return  result;
         }
         resource.setSize(content.getBytes().length);
         resource.setUpdateTime(new Date());
         resourcesMapper.updateById(resource);
 
-
         result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
-            throw new RuntimeException(result.getMsg());
+            throw new ServiceException(result.getMsg());
         }
         return result;
     }
@@ -1059,8 +1026,8 @@ public class ResourcesService extends BaseService {
      * @param content       content
      * @return result
      */
-    private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) {
-        Result result = new Result();
+    private Result<Object> uploadContentToHdfs(String resourceName, String tenantCode, String content) {
+        Result<Object> result = new Result<>();
         String localFilename = "";
         String hdfsFileName = "";
         try {
@@ -1068,7 +1035,7 @@ public class ResourcesService extends BaseService {
 
             if (!FileUtils.writeContent2File(content, localFilename)) {
                 // write file fail
-                logger.error("file {} fail, content is {}", localFilename, content);
+                logger.error("file {} fail, content is {}", localFilename, RegexUtils.escapeNRT(content));
                 putMsg(result, Status.RESOURCE_NOT_EXIST);
                 return result;
             }
@@ -1076,7 +1043,7 @@ public class ResourcesService extends BaseService {
             // get resource file hdfs path
             hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName);
             String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
-            logger.info("resource hdfs path is {} ", hdfsFileName);
+            logger.info("resource hdfs path is {}, resource dir is {}", hdfsFileName, resourcePath);
 
             HadoopUtils hadoopUtils = HadoopUtils.getInstance();
             if (!hadoopUtils.exists(resourcePath)) {
@@ -1098,19 +1065,18 @@ public class ResourcesService extends BaseService {
         return result;
     }
 
-
     /**
      * download file
      *
      * @param resourceId resource id
      * @return resource content
-     * @throws Exception exception
+     * @throws IOException exception
      */
-    public org.springframework.core.io.Resource downloadResource(int resourceId) throws Exception {
+    public org.springframework.core.io.Resource downloadResource(int resourceId) throws IOException {
         // if resource upload startup
-        if (!PropertyUtils.getResUploadStartupState()){
+        if (!PropertyUtils.getResUploadStartupState()) {
             logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
-            throw new RuntimeException("hdfs not startup");
+            throw new ServiceException("hdfs not startup");
         }
 
         Resource resource = resourcesMapper.selectById(resourceId);
@@ -1120,20 +1086,20 @@ public class ResourcesService extends BaseService {
         }
         if (resource.isDirectory()) {
             logger.error("resource id {} is directory,can't download it", resourceId);
-            throw new RuntimeException("cant't download directory");
+            throw new ServiceException("can't download directory");
         }
 
         int userId = resource.getUserId();
         User user = userMapper.selectById(userId);
-        if(user == null){
+        if (user == null) {
             logger.error("user id {} not exists", userId);
-            throw new RuntimeException(String.format("resource owner id %d not exist",userId));
+            throw new ServiceException(String.format("resource owner id %d not exist",userId));
         }
 
         Tenant tenant = tenantMapper.queryById(user.getTenantId());
-        if(tenant == null){
+        if (tenant == null) {
             logger.error("tenant id {} not exists", user.getTenantId());
-            throw new RuntimeException(String.format("The tenant id %d of resource owner not exist",user.getTenantId()));
+            throw new ServiceException(String.format("The tenant id %d of resource owner not exist",user.getTenantId()));
         }
 
         String tenantCode = tenant.getTenantCode();
@@ -1141,13 +1107,12 @@ public class ResourcesService extends BaseService {
         String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
 
         String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
-        logger.info("resource hdfs path is {} ", hdfsFileName);
+        logger.info("resource hdfs path is {}, download local filename is {}", hdfsFileName, localFileName);
 
         HadoopUtils.getInstance().copyHdfsToLocal(hdfsFileName, localFileName, false, true);
         return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
     }
 
-
     /**
      * list all file
      *
@@ -1190,7 +1155,7 @@ public class ResourcesService extends BaseService {
         }
         List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
         List<Resource> list;
-        if (resourceList != null && resourceList.size() > 0) {
+        if (resourceList != null && !resourceList.isEmpty()) {
             Set<Resource> resourceSet = new HashSet<>(resourceList);
             List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);
 
@@ -1213,7 +1178,7 @@ public class ResourcesService extends BaseService {
      * @return unauthorized result code
      */
     public Map<String, Object> unauthorizedUDFFunction(User loginUser, Integer userId) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         //only admin can operate
         if (isNotAdmin(loginUser, result)) {
             return result;
@@ -1221,7 +1186,7 @@ public class ResourcesService extends BaseService {
 
         List<UdfFunc> udfFuncList = udfFunctionMapper.queryUdfFuncExceptUserId(userId);
         List<UdfFunc> resultList = new ArrayList<>();
-        Set<UdfFunc> udfFuncSet = null;
+        Set<UdfFunc> udfFuncSet;
         if (CollectionUtils.isNotEmpty(udfFuncList)) {
             udfFuncSet = new HashSet<>(udfFuncList);
 
@@ -1235,7 +1200,6 @@ public class ResourcesService extends BaseService {
         return result;
     }
 
-
     /**
      * authorized udf function
      *
@@ -1254,7 +1218,6 @@ public class ResourcesService extends BaseService {
         return result;
     }
 
-
     /**
      * authorized file
      *
@@ -1263,7 +1226,7 @@ public class ResourcesService extends BaseService {
      * @return authorized result
      */
     public Map<String, Object> authorizedFile(User loginUser, Integer userId) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         if (isNotAdmin(loginUser, result)) {
             return result;
         }
@@ -1285,7 +1248,7 @@ public class ResourcesService extends BaseService {
      * @param authedResourceList authorized resource list
      */
     private void getAuthorizedResourceList(Set<?> resourceSet, List<?> authedResourceList) {
-        Set<?> authedResourceSet = null;
+        Set<?> authedResourceSet;
         if (CollectionUtils.isNotEmpty(authedResourceList)) {
             authedResourceSet = new HashSet<>(authedResourceList);
             resourceSet.removeAll(authedResourceSet);
@@ -1297,10 +1260,9 @@ public class ResourcesService extends BaseService {
      *
      * @param userId user id
      * @param result return result
-     * @return
+     * @return tenant code
      */
-    private String getTenantCode(int userId,Result result){
-
+    private String getTenantCode(int userId,Result<Object> result) {
         User user = userMapper.selectById(userId);
         if (user == null) {
             logger.error("user {} not exists", userId);
@@ -1309,7 +1271,7 @@ public class ResourcesService extends BaseService {
         }
 
         Tenant tenant = tenantMapper.queryById(user.getTenantId());
-        if (tenant == null){
+        if (tenant == null) {
             logger.error("tenant not exists");
             putMsg(result, Status.TENANT_NOT_EXIST);
             return null;
@@ -1323,13 +1285,13 @@ public class ResourcesService extends BaseService {
      * @param containSelf whether add self to children list
      * @return all children id
      */
-    List<Integer> listAllChildren(Resource resource,boolean containSelf){
+    List<Integer> listAllChildren(Resource resource,boolean containSelf) {
         List<Integer> childList = new ArrayList<>();
         if (resource.getId() != -1 && containSelf) {
             childList.add(resource.getId());
         }
 
-        if(resource.isDirectory()){
+        if (resource.isDirectory()) {
             listAllChildren(resource.getId(),childList);
         }
         return childList;
@@ -1340,12 +1302,11 @@ public class ResourcesService extends BaseService {
      * @param resourceId    resource id
      * @param childList     child list
      */
-    void listAllChildren(int resourceId,List<Integer> childList){
-
+    void listAllChildren(int resourceId,List<Integer> childList) {
         List<Integer> children = resourcesMapper.listChildren(resourceId);
-        for(int chlidId:children){
-            childList.add(chlidId);
-            listAllChildren(chlidId,childList);
+        for (int childId : children) {
+            childList.add(childId);
+            listAllChildren(childId, childList);
         }
     }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
similarity index 94%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 55880ad..f6fd467 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -15,11 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
 
 import org.apache.dolphinscheduler.api.dto.ScheduleParam;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ServiceException;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.MonitorService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.SchedulerService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -61,12 +66,12 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 /**
- * scheduler service
+ * scheduler service impl
  */
 @Service
-public class SchedulerService extends BaseService {
+public class SchedulerServiceImpl extends BaseService implements SchedulerService {
 
-    private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class);
+    private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
 
     @Autowired
     private ProjectService projectService;
@@ -113,7 +118,7 @@ public class SchedulerService extends BaseService {
                                               Priority processInstancePriority,
                                               String workerGroup) {
 
-        Map<String, Object> result = new HashMap();
+        Map<String, Object> result = new HashMap<>();
 
         Project project = projectMapper.queryByName(projectName);
 
@@ -146,7 +151,7 @@ public class SchedulerService extends BaseService {
         scheduleObj.setStartTime(scheduleParam.getStartTime());
         scheduleObj.setEndTime(scheduleParam.getEndTime());
         if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
-            logger.error(scheduleParam.getCrontab() + " verify failure");
+            logger.error("{} verify failure", scheduleParam.getCrontab());
 
             putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
             return result;
@@ -204,7 +209,7 @@ public class SchedulerService extends BaseService {
                                               ReleaseState scheduleStatus,
                                               Priority processInstancePriority,
                                               String workerGroup) {
-        Map<String, Object> result = new HashMap<String, Object>(5);
+        Map<String, Object> result = new HashMap<>();
 
         Project project = projectMapper.queryByName(projectName);
 
@@ -298,8 +303,7 @@ public class SchedulerService extends BaseService {
                                                 String projectName,
                                                 Integer id,
                                                 ReleaseState scheduleStatus) {
-
-        Map<String, Object> result = new HashMap<String, Object>(5);
+        Map<String, Object> result = new HashMap<>();
 
         Project project = projectMapper.queryByName(projectName);
         // check project auth
@@ -340,10 +344,10 @@ public class SchedulerService extends BaseService {
             List<Integer> subProcessDefineIds = new ArrayList<>();
             processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
             Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
-            if (subProcessDefineIds.size() > 0) {
+            if (!subProcessDefineIds.isEmpty()) {
                 List<ProcessDefinition> subProcessDefinitionList =
                         processDefinitionMapper.queryDefinitionListByIdList(idArray);
-                if (subProcessDefinitionList != null && subProcessDefinitionList.size() > 0) {
+                if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
                     for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
                         /**
                          * if there is no online process, exit directly
@@ -362,7 +366,7 @@ public class SchedulerService extends BaseService {
         // check master server exists
         List<Server> masterServers = monitorService.getServerListFromZK(true);
 
-        if (masterServers.size() == 0) {
+        if (masterServers.isEmpty()) {
             putMsg(result, Status.MASTER_NOT_EXISTS);
             return result;
         }
@@ -374,20 +378,17 @@ public class SchedulerService extends BaseService {
 
         try {
             switch (scheduleStatus) {
-                case ONLINE: {
+                case ONLINE:
                     logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                     setSchedule(project.getId(), scheduleObj);
                     break;
-                }
-                case OFFLINE: {
+                case OFFLINE:
                     logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                     deleteSchedule(project.getId(), id);
                     break;
-                }
-                default: {
+                default:
                     putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
                     return result;
-                }
             }
         } catch (Exception e) {
             result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
@@ -426,12 +427,12 @@ public class SchedulerService extends BaseService {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
             return result;
         }
-        Page<Schedule> page = new Page(pageNo, pageSize);
+        Page<Schedule> page = new Page<>(pageNo, pageSize);
         IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(
                 page, processDefineId, searchVal
         );
 
-        PageInfo pageInfo = new PageInfo<Schedule>(pageNo, pageSize);
+        PageInfo<Schedule> pageInfo = new PageInfo<>(pageNo, pageSize);
         pageInfo.setTotalCount((int) scheduleIPage.getTotal());
         pageInfo.setLists(scheduleIPage.getRecords());
         result.put(Constants.DATA_LIST, pageInfo);
@@ -466,7 +467,6 @@ public class SchedulerService extends BaseService {
     }
 
     public void setSchedule(int projectId, Schedule schedule) {
-
         int scheduleId = schedule.getId();
         logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
 
@@ -490,7 +490,7 @@ public class SchedulerService extends BaseService {
      * @param scheduleId schedule id
      * @throws RuntimeException runtime exception
      */
-    public static void deleteSchedule(int projectId, int scheduleId) {
+    public void deleteSchedule(int projectId, int scheduleId) {
         logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
 
         String jobName = QuartzExecutors.buildJobName(scheduleId);
@@ -593,8 +593,8 @@ public class SchedulerService extends BaseService {
             return result;
         }
         List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
-        result.put(Constants.DATA_LIST, selfFireDateList.stream().map(t -> DateUtils.dateToString(t)));
+        result.put(Constants.DATA_LIST, selfFireDateList.stream().map(DateUtils::dateToString));
         putMsg(result, Status.SUCCESS);
         return result;
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
similarity index 91%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 6c68202..6c91e20 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -15,9 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.TaskInstanceService;
+import org.apache.dolphinscheduler.api.service.UsersService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -46,10 +51,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 /**
- * task instance service
+ * task instance service impl
  */
 @Service
-public class TaskInstanceService extends BaseService {
+public class TaskInstanceServiceImpl extends BaseService implements TaskInstanceService {
 
     @Autowired
     ProjectMapper projectMapper;
@@ -118,8 +123,8 @@ public class TaskInstanceService extends BaseService {
             }
         }
 
-        Page<TaskInstance> page = new Page(pageNo, pageSize);
-        PageInfo pageInfo = new PageInfo<TaskInstance>(pageNo, pageSize);
+        Page<TaskInstance> page = new Page<>(pageNo, pageSize);
+        PageInfo<Map<String, Object>> pageInfo = new PageInfo<>(pageNo, pageSize);
         int executorId = usersService.getUserIdByName(executorName);
 
         IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
@@ -154,7 +159,7 @@ public class TaskInstanceService extends BaseService {
      * @return the result code and msg
      */
     public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByName(projectName);
 
         // check user auth
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 8aafe91..9f23428 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -87,7 +87,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
                                             int queueId,
                                             String desc) throws Exception {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         result.put(Constants.STATUS, false);
         if (isNotAdmin(loginUser, result)) {
             return result;
@@ -140,7 +140,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
      */
     public Map<String, Object> queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         if (isNotAdmin(loginUser, result)) {
             return result;
         }
@@ -171,7 +171,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
     public Map<String, Object> updateTenant(User loginUser, int id, String tenantCode, int queueId,
                                             String desc) throws Exception {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         result.put(Constants.STATUS, false);
 
         if (isNotAdmin(loginUser, result)) {
@@ -233,7 +233,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
      */
     @Transactional(rollbackFor = Exception.class)
     public Map<String, Object> deleteTenantById(User loginUser, int id) throws Exception {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         if (isNotAdmin(loginUser, result)) {
             return result;
@@ -291,7 +291,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
      */
     public Map<String, Object> queryTenantList(String tenantCode) {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         List<Tenant> resourceList = tenantMapper.queryByTenantCode(tenantCode);
         if (CollectionUtils.isNotEmpty(resourceList)) {
@@ -311,7 +311,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
      */
     public Map<String, Object> queryTenantList(User loginUser) {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         List<Tenant> resourceList = tenantMapper.selectList(null);
         result.put(Constants.DATA_LIST, resourceList);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index d1416df..58e4912 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -132,8 +132,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
                                           String phone,
                                           String queue,
                                           int state) throws IOException {
-
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         //check all user params
         String msg = this.checkUserParams(userName, userPassword, email, phone);
@@ -295,7 +294,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      * @return user list page
      */
     public Map<String, Object> queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
             return result;
@@ -337,7 +336,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
                                           String phone,
                                           String queue,
                                           int state) throws IOException {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         result.put(Constants.STATUS, false);
 
         if (check(result, !hasPerm(loginUser, userId), Status.USER_NO_OPERATION_PERM)) {
@@ -461,7 +460,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      * @throws Exception exception when operate hdfs
      */
     public Map<String, Object> deleteUserById(User loginUser, int id) throws IOException {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         //only admin can operate
         if (!isAdmin(loginUser)) {
             putMsg(result, Status.USER_NO_OPERATION_PERM, id);
@@ -501,7 +500,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      */
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         result.put(Constants.STATUS, false);
 
         //only admin can operate
@@ -550,7 +549,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      */
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         //only admin can operate
         if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
             return result;
@@ -645,7 +644,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      */
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
 
         //only admin can operate
         if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
@@ -691,7 +690,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      */
     @Transactional(rollbackFor = RuntimeException.class)
     public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         result.put(Constants.STATUS, false);
 
         //only admin can operate
@@ -771,7 +770,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      * @return user list
      */
     public Map<String, Object> queryAllGeneralUsers(User loginUser) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         //only admin can operate
         if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
             return result;
@@ -791,7 +790,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      * @return user list
      */
     public Map<String, Object> queryUserList(User loginUser) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         //only admin can operate
         if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
             return result;
@@ -832,7 +831,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      */
     public Map<String, Object> unauthorizedUser(User loginUser, Integer alertgroupId) {
 
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         //only admin can operate
         if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
             return result;
@@ -867,7 +866,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
      * @return authorized result code
      */
     public Map<String, Object> authorizedUser(User loginUser, Integer alertgroupId) {
-        Map<String, Object> result = new HashMap<>(5);
+        Map<String, Object> result = new HashMap<>();
         //only admin can operate
         if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
             return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java
index 9ff7fac..482cb55 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java
@@ -44,4 +44,13 @@ public class RegexUtils {
         Matcher isNum = pattern.matcher(str);
         return isNum.matches();
     }
+
+    public static String escapeNRT(String str) {
+        // Logging should not be vulnerable to injection attacks: Replace pattern-breaking characters
+        if (str != null && !str.isEmpty()) {
+            return str.replaceAll("[\n|\r|\t]", "_");
+        }
+        return null;
+    }
+
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
index d430d3a..7b9c869 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
@@ -68,7 +69,7 @@ public class ResourcesServiceTest {
     private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceTest.class);
 
     @InjectMocks
-    private ResourcesService resourcesService;
+    private ResourcesServiceImpl resourcesService;
     @Mock
     private ResourceMapper resourcesMapper;
     @Mock
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index deadc21..389afed 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
+import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.model.Server;
@@ -53,7 +54,7 @@ public class SchedulerServiceTest {
 
 
     @InjectMocks
-    private SchedulerService schedulerService;
+    private SchedulerServiceImpl schedulerService;
 
     @Mock
     private MonitorService monitorService;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index b1989b4..0f0071c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 import org.apache.dolphinscheduler.api.ApiApplicationServer;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
+import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.UserType;
@@ -62,7 +63,7 @@ public class TaskInstanceServiceTest {
     private static final Logger logger = LoggerFactory.getLogger(TaskInstanceServiceTest.class);
 
     @InjectMocks
-    private TaskInstanceService taskInstanceService;
+    private TaskInstanceServiceImpl taskInstanceService;
 
     @Mock
     ProjectMapper projectMapper;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
index 6bed928..de2d62d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
@@ -50,12 +50,8 @@ public class StringUtils {
         return !isBlank(s);
     }
 
-    public static String replaceNRTtoUnderline(String src) {
-        if (isBlank(src)) {
-            return src;
-        } else {
-            return src.replaceAll("[\n|\r|\t]", "_");
-        }
+    public static String replaceNRTtoUnderline(String str) {
+        return isBlank(str) ? str : str.replaceAll("[\n|\r|\t]", "_");
     }
 
     public static String trim(String str) {