You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/03/28 07:43:18 UTC

[incubator-dolphinscheduler] branch dev updated: new feature for #404 add resource tree function (#2323)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 38c965d  new feature for #404 add resource tree function (#2323)
38c965d is described below

commit 38c965ddc990b68b9f3d4948fa5091af58a49a4b
Author: lgcareer <18...@163.com>
AuthorDate: Sat Mar 28 15:43:07 2020 +0800

    new feature for #404 add resource tree function (#2323)
    
    * add create resource directory
    
    * add create resource directory
    
    * update the resource test
    
    * add upgrade sql in version 1.2.2
    
    * Adding request parameter id to update queryResourceListPaging
    
    * set isDirectory value is false default
    
    * add full name to update updateResource
    
    * remove request parameter isDirectory to update createResource method
    
    * update queryResourceListPaging with change get to post
    
    * update updateResource method with remove fullName
    
    * File management list modification (#1976)
    
    * add resource component
    
    * add resource tree visitor
    
    * return json string
    
    * update queryResourceList
    
    * upload file need fullName
    
    * add method rootNode
    
    * Shell task resources and authorization resources (#1989)
    
    * File management list modification
    
    * Shell task resources and authorization resources
    
    * download resource when execute task
    
    * download resource when execute task
    
    * update authorization type
    
    * download resource when execute task
    
    * Spark task resource changes (#1990)
    
    * File management list modification
    
    * Shell task resources and authorization resources
    
    * Spark task resource changes
    
    * download resource when execute task
    
    * update udf function service
    
    * add resource type in ResourceComponent
    
    * UDF resource tree and change DAG style (#2019)
    
    * File management list modification
    
    * Shell task resources and authorization resources
    
    * Spark task resource changes
    
    * UDF resource tree and change DAG style
    
    * add deleteIds method in ResourceMapper and ResourceMapperTest
    
    * Add comments on class and method
    
    * add queryResourceByName method in controller
    
    * update verify-name with change name to full name
    
    * update queryResource with add parameter pid
    
    * update queryResource with add parameter pid
    
    * add resource ids in process definition and delete resource need judge whether it is used by any process definition
    
    * Breadcrumb development (#2033)
    
    * File management list modification
    
    * Shell task resources and authorization resources
    
    * Spark task resource changes
    
    * UDF resource tree and change DAG style
    
    * Breadcrumb development
    
    * Breadcrumb development
    
    * Resource tree bug fix (#2040)
    
    * File management list modification
    
    * Shell task resources and authorization resources
    
    * Spark task resource changes
    
    * UDF resource tree and change DAG style
    
    * Breadcrumb development
    
    * Breadcrumb development
    
    * Resource tree bug fix
    
    * update resource service test
    
    * Fix github action rerun failed
    
    * add status of PARENT_RESOURCE_NOT_EXIST
    
    * Fix github action rerun failed (#2067)
    
    * update resource service test
    
    * Fix github action rerun failed
    
    * add status of PARENT_RESOURCE_NOT_EXIST
    
    * Change crumb position
    
    * Change crumb position (#2068)
    
    * build resource process definition map
    
    * UDF changed to multiple choice
    
    * UDF changed to multiple choice (#2077)
    
    * Change crumb position
    
    * UDF changed to multiple choice
    
    * build resource process definition map (#2076)
    
    * update resource service test
    
    * Fix github action rerun failed
    
    * add status of PARENT_RESOURCE_NOT_EXIST
    
    * build resource process definition map
    
    * update resource name also need update all the children full name
    
    * need add queryResource
    
    * update resource name also need update all the children full name (#2096)
    
    * update resource service test
    
    * Fix github action rerun failed
    
    * add status of PARENT_RESOURCE_NOT_EXIST
    
    * build resource process definition map
    
    * update resource name also need update all the children full name
    
    * need add queryResource
    
    * Limit customization file content to no more than 3000 lines
    
    * Limit customization file content to no more than 3000 lines(#2128)
    
    * Limit customization file content to no more than 3000 lines(#2128) (#2140)
    
    * Change crumb position
    
    * UDF changed to multiple choice
    
    * Limit customization file content to no more than 3000 lines
    
    * Limit customization file content to no more than 3000 lines(#2128)
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * add queryResourceJarList (#2192)
    
    * update resource service test
    
    * Fix github action rerun failed
    
    * add status of PARENT_RESOURCE_NOT_EXIST
    
    * build resource process definition map
    
    * update resource name also need update all the children full name
    
    * need add queryResource
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * Modify the main jar package
    
    * Modify the main jar package (#2200)
    
    * Change crumb position
    
    * UDF changed to multiple choice
    
    * Limit customization file content to no more than 3000 lines
    
    * Limit customization file content to no more than 3000 lines(#2128)
    
    * Modify the main jar package
    
    * add resource filter in order to get filtered resource
    
    * add comments of resource filter
    
    * update list children by resource
    
    * choose main jar with resource tree (#2220)
    
    * update resource service test
    
    * Fix github action rerun failed
    
    * add status of PARENT_RESOURCE_NOT_EXIST
    
    * build resource process definition map
    
    * update resource name also need update all the children full name
    
    * need add queryResource
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * add resource filter in order to get filtered resource
    
    * add comments of resource filter
    
    * update list children by resource
    
    * Return null if query resource list is empty
    
    * update queryResource method change parameter pid to id
    
    * getResouDelete checksum and modify parameter namerceId
    
    * revert .env
    
    * remove parameter pid
    
    * Delete request interface
    
    * go back to the last page
    
    * jar interface call
    
    * Fix issue #2234 and #2228
    
    * change resource name with full name
    
    * Fix issue #2234 and #2228 (#2246)
    
    * update resource service test
    
    * Fix github action rerun failed
    
    * add status of PARENT_RESOURCE_NOT_EXIST
    
    * build resource process definition map
    
    * update resource name also need update all the children full name
    
    * need add queryResource
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * add queryResourceJarList
    
    * add resource filter in order to get filtered resource
    
    * add comments of resource filter
    
    * update list children by resource
    
    * Return null if query resource list is empty
    
    * update queryResource method change parameter pid to id
    
    * revert .env
    
    * remove parameter pid
    
    * Fix issue #2234 and #2228
    
    * change resource name with full name
    
    * Fix list query value error
    
    * remove unauth-file with authorize-resource-tree
    
    * Repair data cannot be echoed
    
    * Repair data cannot be echoed
    
    * execute mr and spark task need query resource name before
    
    * Authorized resource interface replacement
    
    * Authorized resource interface replacement
    
    * Filter UDF resources
    
    * Change parameters
    
    * need query all authorized directory children when create task
    
    * Change normalize.scss import method and animation.scss license modification
    
    * Delete file list update processing
    
    * It's fixed that resource not deleted in hdfs when delete it.
    
    * add tooltips
    
    * add tooltips (#2310)
    
    * Echo workflow name and modify udf management name
    
    * [new feature]add resource tree function
    
    * revert front code in order to be same as dev branch
    
    * revert front code in order to be same as dev branch
    
    * revert common.properties and application.properties
    
    * add super method
    
    * update flink parameter test
    
    * update flink parameter and unit test
    
    * update resource service test
    
    * If resource list is empty,need init it
    
    * update flink parameter test
    
    Co-authored-by: break60 <79...@qq.com>
    Co-authored-by: xingchun-chen <55...@users.noreply.github.com>
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
 .../api/controller/ResourcesController.java        | 152 +++++-
 .../api/dto/resources/Directory.java               |  14 +-
 .../api/dto/resources/FileLeaf.java                |  10 +-
 .../api/dto/resources/ResourceComponent.java       | 193 +++++++
 .../api/dto/resources/filter/IFilter.java          |  13 +-
 .../api/dto/resources/filter/ResourceFilter.java   | 100 ++++
 .../dto/resources/visitor/ResourceTreeVisitor.java | 130 +++++
 .../api/dto/resources/visitor/Visitor.java         |  18 +-
 .../apache/dolphinscheduler/api/enums/Status.java  |  12 +-
 .../api/service/ProcessDefinitionService.java      |  34 +-
 .../api/service/ResourcesService.java              | 559 ++++++++++++++++-----
 .../api/service/UdfFuncService.java                |   4 +-
 .../dto/resources/filter/ResourceFilterTest.java   |  58 +++
 .../resources/visitor/ResourceTreeVisitorTest.java |  82 +++
 .../api/service/ResourcesServiceTest.java          | 112 +++--
 .../dolphinscheduler/api/utils/CheckUtilsTest.java |   7 +-
 .../common/enums/AuthorizationType.java            |  12 +-
 .../common/process/ResourceInfo.java               |  10 +
 .../common/task/AbstractParameters.java            |   3 +-
 .../dolphinscheduler/common/task/IParameters.java  |   4 +-
 .../task/conditions/ConditionsParameters.java      |   3 +-
 .../common/task/datax/DataxParameters.java         |   3 +-
 .../common/task/dependent/DependentParameters.java |   3 +-
 .../common/task/flink/FlinkParameters.java         |  29 +-
 .../common/task/http/HttpParameters.java           |   3 +-
 .../common/task/mr/MapreduceParameters.java        |  20 +-
 .../common/task/procedure/ProcedureParameters.java |   3 +-
 .../common/task/python/PythonParameters.java       |  10 +-
 .../common/task/shell/ShellParameters.java         |   9 +-
 .../common/task/spark/SparkParameters.java         |  20 +-
 .../common/task/sql/SqlParameters.java             |   3 +-
 .../common/task/sqoop/SqoopParameters.java         |   3 +-
 .../task/subprocess/SubProcessParameters.java      |   3 +-
 .../dolphinscheduler/common/utils/HadoopUtils.java |  49 +-
 .../common/task/FlinkParametersTest.java           |  14 +-
 .../dao/entity/ProcessDefinition.java              |  15 +
 .../dolphinscheduler/dao/entity/Resource.java      |  68 ++-
 .../dao/mapper/ProcessDefinitionMapper.java        |  11 +-
 .../dao/mapper/ResourceMapper.java                 |  53 +-
 .../dolphinscheduler/dao/mapper/UdfFuncMapper.java |  15 +
 .../dao/mapper/ProcessDefinitionMapper.xml         |   7 +
 .../dolphinscheduler/dao/mapper/ResourceMapper.xml |  79 ++-
 .../dolphinscheduler/dao/mapper/UdfFuncMapper.xml  |  24 +
 .../dao/mapper/ResourceMapperTest.java             |  57 ++-
 .../server/worker/runner/TaskScheduleThread.java   |  50 +-
 .../server/worker/task/AbstractYarnTask.java       |   5 +
 .../server/worker/task/flink/FlinkTask.java        |  25 +
 .../server/worker/task/mr/MapReduceTask.java       |  26 +-
 .../server/worker/task/spark/SparkTask.java        |  26 +-
 .../server/worker/task/sqoop/SqoopTask.java        |   4 +
 .../service/permission/PermissionCheck.java        |  31 ++
 .../service/process/ProcessService.java            |  26 +-
 .../home/pages/dag/_source/plugIn/jsPlumbHandle.js |   2 +-
 .../resource/pages/file/pages/_source/common.js    |   0
 .../pages/file/pages/list/_source/list.vue         |   0
 .../pages/file/pages/list/_source/rename.vue       |   2 +-
 .../pages/resource/pages/file/pages/list/index.vue |   0
 .../src/js/conf/home/store/resource/actions.js     |   0
 .../js/module/components/fileUpdate/fileUpdate.vue |   0
 sql/soft_version                                   |   2 +-
 .../1.2.2_schema/mysql/dolphinscheduler_ddl.sql    |  82 ++-
 .../postgresql/dolphinscheduler_ddl.sql            |  85 +++-
 .../postgresql/dolphinscheduler_dml.sql            |   4 +-
 63 files changed, 2056 insertions(+), 345 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
index f28ba10..40effb6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
@@ -67,6 +67,50 @@ public class ResourcesController extends BaseController{
      * @param alias alias
      * @param description description
      * @param type type
+     * @return create result code
+     */
+
+    /**
+     *
+     * @param loginUser     login user
+     * @param type          type
+     * @param alias         alias
+     * @param description   description
+     * @param pid           parent id
+     * @param currentDir    current directory
+     * @return
+     */
+    @ApiOperation(value = "createDirctory", notes= "CREATE_RESOURCE_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
+            @ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String"),
+            @ApiImplicitParam(name = "description", value = "RESOURCE_DESC",  dataType ="String"),
+            @ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile")
+    })
+    @PostMapping(value = "/directory/create")
+    public Result createDirectory(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                 @RequestParam(value = "type") ResourceType type,
+                                 @RequestParam(value ="name") String alias,
+                                 @RequestParam(value = "description", required = false) String description,
+                                 @RequestParam(value ="pid") int pid,
+                                 @RequestParam(value ="currentDir") String currentDir) {
+        try {
+            logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}",
+                    loginUser.getUserName(),type, alias, description,pid,currentDir);
+            return resourceService.createDirectory(loginUser,alias, description,type ,pid,currentDir);
+        } catch (Exception e) {
+            logger.error(CREATE_RESOURCE_ERROR.getMsg(),e);
+            return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg());
+        }
+    }
+
+    /**
+     * create resource
+     *
+     * @param loginUser login user
+     * @param alias alias
+     * @param description description
+     * @param type type
      * @param file file
      * @return create result code
      */
@@ -80,13 +124,15 @@ public class ResourcesController extends BaseController{
     @PostMapping(value = "/create")
     public Result createResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                  @RequestParam(value = "type") ResourceType type,
-                                 @RequestParam(value ="name")String alias,
+                                 @RequestParam(value ="name") String alias,
                                  @RequestParam(value = "description", required = false) String description,
-                                 @RequestParam("file") MultipartFile file) {
+                                 @RequestParam("file") MultipartFile file,
+                                 @RequestParam(value ="pid") int pid,
+                                 @RequestParam(value ="currentDir") String currentDir) {
         try {
             logger.info("login user {}, create resource, type: {}, resource alias: {}, desc: {}, file: {},{}",
                     loginUser.getUserName(),type, alias, description, file.getName(), file.getOriginalFilename());
-            return resourceService.createResource(loginUser,alias, description,type ,file);
+            return resourceService.createResource(loginUser,alias, description,type ,file,pid,currentDir);
         } catch (Exception e) {
             logger.error(CREATE_RESOURCE_ERROR.getMsg(),e);
             return error(CREATE_RESOURCE_ERROR.getCode(), CREATE_RESOURCE_ERROR.getMsg());
@@ -120,7 +166,7 @@ public class ResourcesController extends BaseController{
         try {
             logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}",
                     loginUser.getUserName(),type, alias, description);
-            return resourceService.updateResource(loginUser,resourceId,alias, description,type);
+            return resourceService.updateResource(loginUser,resourceId,alias,description,type);
         } catch (Exception e) {
             logger.error(UPDATE_RESOURCE_ERROR.getMsg(),e);
             return error(Status.UPDATE_RESOURCE_ERROR.getCode(), Status.UPDATE_RESOURCE_ERROR.getMsg());
@@ -166,6 +212,7 @@ public class ResourcesController extends BaseController{
     @ApiOperation(value = "queryResourceListPaging", notes= "QUERY_RESOURCE_LIST_PAGING_NOTES")
     @ApiImplicitParams({
             @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
+            @ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType ="int"),
             @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType ="String"),
             @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"),
             @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType ="Int",example = "20")
@@ -174,6 +221,7 @@ public class ResourcesController extends BaseController{
     @ResponseStatus(HttpStatus.OK)
     public Result queryResourceListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                      @RequestParam(value ="type") ResourceType type,
+                                     @RequestParam(value ="id") int id,
                                      @RequestParam("pageNo") Integer pageNo,
                                      @RequestParam(value = "searchVal", required = false) String searchVal,
                                      @RequestParam("pageSize") Integer pageSize
@@ -187,7 +235,7 @@ public class ResourcesController extends BaseController{
             }
 
             searchVal = ParameterUtils.handleEscapes(searchVal);
-            result = resourceService.queryResourceListPaging(loginUser,type,searchVal,pageNo, pageSize);
+            result = resourceService.queryResourceListPaging(loginUser,id,type,searchVal,pageNo, pageSize);
             return returnDataListPaging(result);
         }catch (Exception e){
             logger.error(QUERY_RESOURCES_LIST_PAGING.getMsg(),e);
@@ -227,26 +275,26 @@ public class ResourcesController extends BaseController{
      * verify resource by alias and type
      *
      * @param loginUser login user
-     * @param alias resource name
-     * @param type resource type
+     * @param fullName  resource full name
+     * @param type      resource type
      * @return true if the resource name not exists, otherwise return false
      */
     @ApiOperation(value = "verifyResourceName", notes= "VERIFY_RESOURCE_NAME_NOTES")
     @ApiImplicitParams({
             @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
-            @ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType ="String")
+            @ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String")
     })
     @GetMapping(value = "/verify-name")
     @ResponseStatus(HttpStatus.OK)
     public Result verifyResourceName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                     @RequestParam(value ="name") String alias,
+                                     @RequestParam(value ="fullName") String fullName,
                                      @RequestParam(value ="type") ResourceType type
     ) {
         try {
             logger.info("login user {}, verfiy resource alias: {},resource type: {}",
-                    loginUser.getUserName(), alias,type);
+                    loginUser.getUserName(), fullName,type);
 
-            return resourceService.verifyResourceName(alias,type,loginUser);
+            return resourceService.verifyResourceName(fullName,type,loginUser);
         } catch (Exception e) {
             logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e);
             return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg());
@@ -254,6 +302,63 @@ public class ResourcesController extends BaseController{
     }
 
     /**
+     * query resources jar list
+     *
+     * @param loginUser login user
+     * @param type resource type
+     * @return resource list
+     */
+    @ApiOperation(value = "queryResourceJarList", notes= "QUERY_RESOURCE_LIST_NOTES")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType")
+    })
+    @GetMapping(value="/list/jar")
+    @ResponseStatus(HttpStatus.OK)
+    public Result queryResourceJarList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                    @RequestParam(value ="type") ResourceType type
+    ){
+        try{
+            logger.info("query resource list, login user:{}, resource type:{}", loginUser.getUserName(), type.toString());
+            Map<String, Object> result = resourceService.queryResourceJarList(loginUser, type);
+            return returnDataList(result);
+        }catch (Exception e){
+            logger.error(QUERY_RESOURCES_LIST_ERROR.getMsg(),e);
+            return error(Status.QUERY_RESOURCES_LIST_ERROR.getCode(), Status.QUERY_RESOURCES_LIST_ERROR.getMsg());
+        }
+    }
+
+    /**
+     * query resource by full name and type
+     *
+     * @param loginUser login user
+     * @param fullName  resource full name
+     * @param type      resource type
+     * @return true if the resource name not exists, otherwise return false
+     */
+    @ApiOperation(value = "queryResource", notes= "QUERY_BY_RESOURCE_NAME")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType ="ResourceType"),
+            @ApiImplicitParam(name = "fullName", value = "RESOURCE_FULL_NAME", required = true, dataType ="String")
+    })
+    @GetMapping(value = "/queryResource")
+    @ResponseStatus(HttpStatus.OK)
+    public Result queryResource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                     @RequestParam(value ="fullName",required = false) String fullName,
+                                     @RequestParam(value ="id",required = false) Integer id,
+                                     @RequestParam(value ="type") ResourceType type
+    ) {
+        try {
+            logger.info("login user {}, query resource by full name: {} or id: {},resource type: {}",
+                    loginUser.getUserName(), fullName,id,type);
+
+            return resourceService.queryResource(fullName,id,type);
+        } catch (Exception e) {
+            logger.error(RESOURCE_NOT_EXIST.getMsg(), e);
+            return error(Status.RESOURCE_NOT_EXIST.getCode(), Status.RESOURCE_NOT_EXIST.getMsg());
+        }
+    }
+
+    /**
      * view resource file online
      *
      * @param loginUser login user
@@ -310,16 +415,18 @@ public class ResourcesController extends BaseController{
                                        @RequestParam(value ="fileName")String fileName,
                                        @RequestParam(value ="suffix")String fileSuffix,
                                        @RequestParam(value = "description", required = false) String description,
-                                       @RequestParam(value = "content") String content
+                                       @RequestParam(value = "content") String content,
+                                       @RequestParam(value ="pid") int pid,
+                                       @RequestParam(value ="currentDir") String currentDir
     ) {
         try{
             logger.info("login user {}, online create resource! fileName : {}, type : {}, suffix : {},desc : {},content : {}",
-                    loginUser.getUserName(),fileName,type,fileSuffix,description,content);
+                    loginUser.getUserName(),fileName,type,fileSuffix,description,content,pid,currentDir);
             if(StringUtils.isEmpty(content)){
                 logger.error("resource file contents are not allowed to be empty");
                 return error(Status.RESOURCE_FILE_IS_EMPTY.getCode(), RESOURCE_FILE_IS_EMPTY.getMsg());
             }
-            return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content);
+            return resourceService.onlineCreateResource(loginUser,type,fileName,fileSuffix,description,content,pid,currentDir);
         }catch (Exception e){
             logger.error(CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg(),e);
             return error(Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getCode(), Status.CREATE_RESOURCE_FILE_ON_LINE_ERROR.getMsg());
@@ -384,6 +491,9 @@ public class ResourcesController extends BaseController{
                     .ok()
                     .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getFilename() + "\"")
                     .body(file);
+        }catch (RuntimeException e){
+            logger.error(e.getMessage(),e);
+            return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(e.getMessage());
         }catch (Exception e){
             logger.error(DOWNLOAD_RESOURCE_FILE_ERROR.getMsg(),e);
             return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Status.DOWNLOAD_RESOURCE_FILE_ERROR.getMsg());
@@ -658,21 +768,21 @@ public class ResourcesController extends BaseController{
      * @param userId user id
      * @return unauthorized result code
      */
-    @ApiOperation(value = "unauthorizedFile", notes= "UNAUTHORIZED_FILE_NOTES")
+    @ApiOperation(value = "authorizeResourceTree", notes= "AUTHORIZE_RESOURCE_TREE_NOTES")
     @ApiImplicitParams({
             @ApiImplicitParam(name = "userId", value = "USER_ID", required = true, dataType ="Int", example = "100")
     })
-    @GetMapping(value = "/unauth-file")
+    @GetMapping(value = "/authorize-resource-tree")
     @ResponseStatus(HttpStatus.CREATED)
-    public Result unauthorizedFile(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+    public Result authorizeResourceTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                    @RequestParam("userId") Integer userId) {
         try{
-            logger.info("resource unauthorized file, user:{}, unauthorized user id:{}", loginUser.getUserName(), userId);
-            Map<String, Object> result =  resourceService.unauthorizedFile(loginUser, userId);
+            logger.info("all resource file, user:{}, user id:{}", loginUser.getUserName(), userId);
+            Map<String, Object> result =  resourceService.authorizeResourceTree(loginUser, userId);
             return returnDataList(result);
         }catch (Exception e){
-            logger.error(UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg(),e);
-            return error(Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getCode(), Status.UNAUTHORIZED_FILE_RESOURCE_ERROR.getMsg());
+            logger.error(AUTHORIZE_RESOURCE_TREE.getMsg(),e);
+            return error(Status.AUTHORIZE_RESOURCE_TREE.getCode(), Status.AUTHORIZE_RESOURCE_TREE.getMsg());
         }
     }
 
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java
similarity index 80%
copy from dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java
index 8e534f9..289d506 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java
@@ -1,3 +1,5 @@
+package org.apache.dolphinscheduler.api.dto.resources;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,10 +16,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 /**
- * Create file type
+ * directory
  */
-let filtTypeArr = ['txt', 'log', 'sh', 'conf', 'cfg', 'py', 'java', 'sql', 'xml', 'hql', 'properties']
+public class Directory extends ResourceComponent{
+
+    @Override
+    public boolean isDirctory() {
+        return true;
+    }
 
-export { filtTypeArr }
+}
diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java
similarity index 85%
copy from sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java
index 38964cc..b9b9182 100644
--- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java
@@ -1,3 +1,5 @@
+package org.apache.dolphinscheduler.api.dto.resources;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -13,4 +15,10 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
\ No newline at end of file
+ */
+/**
+ * file leaf
+ */
+public class FileLeaf extends ResourceComponent{
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java
new file mode 100644
index 0000000..fb0da70
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java
@@ -0,0 +1,193 @@
+package org.apache.dolphinscheduler.api.dto.resources;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.fastjson.annotation.JSONType;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * resource component
+ */
+@JSONType(orders={"id","pid","name","fullName","description","isDirctory","children","type"})
+public abstract class ResourceComponent {
+    public ResourceComponent() {
+    }
+
+    public ResourceComponent(int id, int pid, String name, String fullName, String description, boolean isDirctory) {
+        this.id = id;
+        this.pid = pid;
+        this.name = name;
+        this.fullName = fullName;
+        this.description = description;
+        this.isDirctory = isDirctory;
+        int directoryFlag = isDirctory ? 1:0;
+        this.idValue = String.format("%s_%s",id,directoryFlag);
+    }
+
+
+    /**
+     * id
+     */
+    @JSONField(ordinal = 1)
+    protected int id;
+    /**
+     * parent id
+     */
+    @JSONField(ordinal = 2)
+    protected int pid;
+    /**
+     * name
+     */
+    @JSONField(ordinal = 3)
+    protected String name;
+    /**
+     * current directory
+     */
+    protected String currentDir;
+    /**
+     * full name
+     */
+    @JSONField(ordinal = 4)
+    protected String fullName;
+    /**
+     * description
+     */
+    @JSONField(ordinal = 5)
+    protected String description;
+    /**
+     * is directory
+     */
+    @JSONField(ordinal = 6)
+    protected boolean isDirctory;
+    /**
+     * id value
+     */
+    @JSONField(ordinal = 7)
+    protected String idValue;
+    /**
+     * resoruce type
+     */
+    @JSONField(ordinal = 8)
+    protected ResourceType type;
+    /**
+     * children
+     */
+    @JSONField(ordinal = 8)
+    protected List<ResourceComponent> children = new ArrayList<>();
+
+    /**
+     * add resource component
+     * @param resourceComponent resource component
+     */
+    public void add(ResourceComponent resourceComponent){
+        children.add(resourceComponent);
+    }
+
+    public String getName(){
+        return this.name;
+    }
+
+    public String getDescription(){
+        return this.description;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public int getPid() {
+        return pid;
+    }
+
+    public void setPid(int pid) {
+        this.pid = pid;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getFullName() {
+        return fullName;
+    }
+
+    public void setFullName(String fullName) {
+        this.fullName = fullName;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public boolean isDirctory() {
+        return isDirctory;
+    }
+
+    public void setDirctory(boolean dirctory) {
+        isDirctory = dirctory;
+    }
+
+    public String getIdValue() {
+        return idValue;
+    }
+
+    public void setIdValue(int id,boolean isDirctory) {
+        int directoryFlag = isDirctory ? 1:0;
+        this.idValue = String.format("%s_%s",id,directoryFlag);
+    }
+
+    public ResourceType getType() {
+        return type;
+    }
+
+    public void setType(ResourceType type) {
+        this.type = type;
+    }
+
+    public List<ResourceComponent> getChildren() {
+        return children;
+    }
+
+    public void setChildren(List<ResourceComponent> children) {
+        this.children = children;
+    }
+
+    @Override
+    public String toString() {
+        return "ResourceComponent{" +
+                "id=" + id +
+                ", pid=" + pid +
+                ", name='" + name + '\'' +
+                ", currentDir='" + currentDir + '\'' +
+                ", fullName='" + fullName + '\'' +
+                ", description='" + description + '\'' +
+                ", isDirctory=" + isDirctory +
+                ", idValue='" + idValue + '\'' +
+                ", type=" + type +
+                ", children=" + children +
+                '}';
+    }
+
+}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
similarity index 78%
copy from dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
index 8e534f9..ce6ce3a 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
@@ -14,10 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.dolphinscheduler.api.dto.resources.filter;
+
+import org.apache.dolphinscheduler.dao.entity.Resource;
+
+import java.util.List;
 
 /**
- * Create file type
+ * interface filter
  */
-let filtTypeArr = ['txt', 'log', 'sh', 'conf', 'cfg', 'py', 'java', 'sql', 'xml', 'hql', 'properties']
-
-export { filtTypeArr }
+public interface IFilter {
+    List<Resource> filter();
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
new file mode 100644
index 0000000..c918a16
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.dto.resources.filter;
+
+import org.apache.dolphinscheduler.dao.entity.Resource;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * resource filter
+ */
+public class ResourceFilter implements IFilter {
+    /**
+     * resource suffix
+     */
+    private String suffix;
+    /**
+     * resource list
+     */
+    private List<Resource> resourceList;
+
+    /**
+     * parent list
+     */
+    //Set<Resource> parentList =  new HashSet<>();
+
+    /**
+     * constructor
+     * @param suffix        resource suffix
+     * @param resourceList  resource list
+     */
+    public ResourceFilter(String suffix, List<Resource> resourceList) {
+        this.suffix = suffix;
+        this.resourceList = resourceList;
+    }
+
+    /**
+     * file filter
+     * @return file filtered by suffix
+     */
+    public Set<Resource> fileFilter(){
+        Set<Resource> resources = resourceList.stream().filter(t -> {
+            String alias = t.getAlias();
+            return alias.endsWith(suffix);
+        }).collect(Collectors.toSet());
+        return resources;
+    }
+
+    /**
+     * list all parent dir
+     * @return parent resource dir set
+     */
+    Set<Resource> listAllParent(){
+        Set<Resource> parentList =  new HashSet<>();
+        Set<Resource> filterFileList = fileFilter();
+        for(Resource file:filterFileList){
+            parentList.add(file);
+            setAllParent(file,parentList);
+        }
+        return parentList;
+
+    }
+
+    /**
+     * list all parent dir
+     * @param resource  resource
+     * @return parent resource dir set
+     */
+    private void setAllParent(Resource resource,Set<Resource> parentList){
+        for (Resource resourceTemp : resourceList) {
+            if (resourceTemp.getId() == resource.getPid()) {
+                parentList.add(resourceTemp);
+                setAllParent(resourceTemp,parentList);
+            }
+        }
+    }
+
+    @Override
+    public List<Resource> filter() {
+        return new ArrayList<>(listAllParent());
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
new file mode 100644
index 0000000..5cf1188
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.dto.resources.visitor;
+
+
+import org.apache.dolphinscheduler.api.dto.resources.Directory;
+import org.apache.dolphinscheduler.api.dto.resources.FileLeaf;
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * resource tree visitor
+ */
+public class ResourceTreeVisitor implements Visitor{
+
+    /**
+     * resource list
+     */
+    private List<Resource> resourceList;
+
+    public ResourceTreeVisitor() {
+    }
+
+    /**
+     * constructor
+     * @param resourceList resource list
+     */
+    public ResourceTreeVisitor(List<Resource> resourceList) {
+        this.resourceList = resourceList;
+    }
+
+    /**
+     * visit
+     * @return resoruce component
+     */
+    public ResourceComponent visit() {
+        ResourceComponent rootDirectory = new Directory();
+        for (Resource resource : resourceList) {
+            // judge whether is root node
+            if (rootNode(resource)){
+                ResourceComponent tempResourceComponent = getResourceComponent(resource);
+                rootDirectory.add(tempResourceComponent);
+                tempResourceComponent.setChildren(setChildren(tempResourceComponent.getId(),resourceList));
+            }
+        }
+        return rootDirectory;
+    }
+
+    /**
+     * set children
+     * @param id    id
+     * @param list  resource list
+     * @return resource component list
+     */
+    public static List<ResourceComponent> setChildren(int id, List<Resource> list ){
+        List<ResourceComponent> childList = new ArrayList<>();
+        for (Resource resource : list) {
+            if (id == resource.getPid()){
+                ResourceComponent tempResourceComponent = getResourceComponent(resource);
+                childList.add(tempResourceComponent);
+            }
+        }
+        for (ResourceComponent resourceComponent : childList) {
+            resourceComponent.setChildren(setChildren(resourceComponent.getId(),list));
+        }
+        if (childList.size()==0){
+            return new ArrayList<>();
+        }
+        return childList;
+    }
+
+    /**
+     * Determine whether it is the root node
+     * @param resource resource
+     * @return true if it is the root node
+     */
+    public boolean rootNode(Resource resource) {
+
+        boolean isRootNode = true;
+        if(resource.getPid() != -1 ){
+            for (Resource parent : resourceList) {
+                if (resource.getPid() == parent.getId()) {
+                    isRootNode = false;
+                    break;
+                }
+            }
+        }
+        return isRootNode;
+    }
+
+    /**
+     * get resource component by resource
+     * @param resource resource
+     * @return resource component
+     */
+    private static ResourceComponent getResourceComponent(Resource resource) {
+        ResourceComponent tempResourceComponent;
+        if(resource.isDirectory()){
+            tempResourceComponent = new Directory();
+        }else{
+            tempResourceComponent = new FileLeaf();
+        }
+        tempResourceComponent.setName(resource.getAlias());
+        tempResourceComponent.setFullName(resource.getFullName().replaceFirst("/",""));
+        tempResourceComponent.setId(resource.getId());
+        tempResourceComponent.setPid(resource.getPid());
+        tempResourceComponent.setIdValue(resource.getId(),resource.isDirectory());
+        tempResourceComponent.setDescription(resource.getDescription());
+        tempResourceComponent.setType(resource.getType());
+        return tempResourceComponent;
+    }
+
+}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java
similarity index 75%
copy from dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java
index 8e534f9..3dfce7c 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java
@@ -1,3 +1,8 @@
+package org.apache.dolphinscheduler.api.dto.resources.visitor;
+
+
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,10 +19,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 /**
- * Create file type
+ * Visitor
  */
-let filtTypeArr = ['txt', 'log', 'sh', 'conf', 'cfg', 'py', 'java', 'sql', 'xml', 'hql', 'properties']
-
-export { filtTypeArr }
+public interface Visitor {
+    /**
+     * visit
+     * @return resource component
+     */
+    ResourceComponent visit();
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 9955463..416dc0e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -97,7 +97,7 @@ public enum Status {
     VERIFY_UDF_FUNCTION_NAME_ERROR( 10070,"verify udf function name error", "UDF函数名称验证错误"),
     DELETE_UDF_FUNCTION_ERROR( 10071,"delete udf function error", "删除UDF函数错误"),
     AUTHORIZED_FILE_RESOURCE_ERROR( 10072,"authorized file resource error", "授权资源文件错误"),
-    UNAUTHORIZED_FILE_RESOURCE_ERROR( 10073,"unauthorized file resource error", "查询未授权资源错误"),
+    AUTHORIZE_RESOURCE_TREE( 10073,"authorize resource tree display error","授权资源目录树错误"),
     UNAUTHORIZED_UDF_FUNCTION_ERROR( 10074,"unauthorized udf function error", "查询未授权UDF函数错误"),
     AUTHORIZED_UDF_FUNCTION_ERROR(10075,"authorized udf function error", "授权UDF函数错误"),
     CREATE_SCHEDULE_ERROR(10076,"create schedule error", "创建调度配置错误"),
@@ -184,10 +184,12 @@ public enum Status {
     RESOURCE_SIZE_EXCEED_LIMIT(20007, "upload resource file size exceeds limit", "上传资源文件大小超过限制"),
     RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified", "资源文件后缀不支持修改"),
     UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar", "UDF资源文件后缀名只支持[jar]"),
-    HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"),
-    RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
-    RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"),
-
+    HDFS_COPY_FAIL(20010, "hdfs copy {0} -> {1} fail", "hdfs复制失败:[{0}] -> [{1}]"),
+    RESOURCE_FILE_EXIST(20011, "resource file {0} already exists in hdfs,please delete it or change name!", "资源文件[{0}]在hdfs中已存在,请删除或修改资源名"),
+    RESOURCE_FILE_NOT_EXIST(20012, "resource file {0} not exists in hdfs!", "资源文件[{0}]在hdfs中不存在"),
+    UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}","udf函数绑定了资源文件[{0}]"),
+    RESOURCE_IS_USED(20014, "resource file is used by process definition","资源文件被上线的流程定义使用了"),
+    PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist","父资源文件不存在"),
 
 
     USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 44c2b44..734adb9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -38,11 +38,9 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.dao.mapper.*;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
@@ -162,6 +160,31 @@ public class ProcessDefinitionService extends BaseDAGService {
         return result;
     }
 
+    /**
+     * get resource ids
+     * @param processData process data
+     * @return resource ids
+     */
+    private String getResourceIds(ProcessData processData) {
+        List<TaskNode> tasks = processData.getTasks();
+        Set<Integer> resourceIds = new HashSet<>();
+        for(TaskNode taskNode : tasks){
+            String taskParameter = taskNode.getParams();
+            AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter);
+            Set<Integer> tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet());
+            resourceIds.addAll(tempSet);
+        }
+
+        StringBuilder sb = new StringBuilder();
+        for(int i : resourceIds) {
+            if (sb.length() > 0) {
+                sb.append(",");
+            }
+            sb.append(i);
+        }
+        return sb.toString();
+    }
+
 
     /**
      * query proccess definition list
@@ -946,7 +969,9 @@ public class ProcessDefinitionService extends BaseDAGService {
             return result;
         }
 
+
         String processDefinitionJson = processDefinition.getProcessDefinitionJson();
+
         ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
 
         //process data check
@@ -1163,6 +1188,7 @@ public class ProcessDefinitionService extends BaseDAGService {
     private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {
 
         String processDefinitionJson = processDefinition.getProcessDefinitionJson();
+
         ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
 
         //check process data
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index f46eda7..ff87aad 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -16,9 +16,15 @@
  */
 package org.apache.dolphinscheduler.api.service;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import org.apache.commons.collections.BeanMap;
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
+import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter;
+import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor;
+import org.apache.dolphinscheduler.api.dto.resources.visitor.Visitor;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -39,6 +45,7 @@ import org.springframework.web.multipart.MultipartFile;
 
 import java.text.MessageFormat;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import static org.apache.dolphinscheduler.common.Constants.*;
 
@@ -65,6 +72,82 @@ public class ResourcesService extends BaseService {
     @Autowired
     private ResourceUserMapper resourceUserMapper;
 
+    @Autowired
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    /**
+     * create directory
+     *
+     * @param loginUser login user
+     * @param name alias
+     * @param description description
+     * @param type type
+     * @param pid parent id
+     * @param currentDir current directory
+     * @return create directory result
+     */
+    @Transactional(rollbackFor = Exception.class)
+    public Result createDirectory(User loginUser,
+                                 String name,
+                                 String description,
+                                 ResourceType type,
+                                 int pid,
+                                 String currentDir) {
+        Result result = new Result();
+        // if hdfs not startup
+        if (!PropertyUtils.getResUploadStartupState()){
+            logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
+            putMsg(result, Status.HDFS_NOT_STARTUP);
+            return result;
+        }
+        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
+
+        if (pid != -1) {
+            Resource parentResource = resourcesMapper.selectById(pid);
+
+            if (parentResource == null) {
+                putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
+                return result;
+            }
+
+            if (!hasPerm(loginUser, parentResource.getUserId())) {
+                putMsg(result, Status.USER_NO_OPERATION_PERM);
+                return result;
+            }
+        }
+
+
+        if (checkResourceExists(fullName, 0, type.ordinal())) {
+            logger.error("resource directory {} has exist, can't recreate", fullName);
+            putMsg(result, Status.RESOURCE_EXIST);
+            return result;
+        }
+
+        Date now = new Date();
+
+        Resource resource = new Resource(pid,name,fullName,true,description,name,loginUser.getId(),type,0,now,now);
+
+        try {
+            resourcesMapper.insert(resource);
+
+            putMsg(result, Status.SUCCESS);
+            Map<Object, Object> dataMap = new BeanMap(resource);
+            Map<String, Object> resultMap = new HashMap<String, Object>();
+            for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
+                if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
+                    resultMap.put(entry.getKey().toString(), entry.getValue());
+                }
+            }
+            result.setData(resultMap);
+        } catch (Exception e) {
+            logger.error("resource already exists, can't recreate ", e);
+            throw new RuntimeException("resource already exists, can't recreate");
+        }
+        //create directory in hdfs
+        createDirecotry(loginUser,fullName,type,result);
+        return result;
+    }
+
     /**
      * create resource
      *
@@ -73,6 +156,8 @@ public class ResourcesService extends BaseService {
      * @param desc description
      * @param file file
      * @param type type
+     * @param pid parent id
+     * @param currentDir current directory
      * @return create result code
      */
     @Transactional(rollbackFor = Exception.class)
@@ -80,7 +165,9 @@ public class ResourcesService extends BaseService {
                                  String name,
                                  String desc,
                                  ResourceType type,
-                                 MultipartFile file) {
+                                 MultipartFile file,
+                                 int pid,
+                                 String currentDir) {
         Result result = new Result();
 
         // if hdfs not startup
@@ -123,7 +210,8 @@ public class ResourcesService extends BaseService {
         }
 
         // check resoure name exists
-        if (checkResourceExists(name, 0, type.ordinal())) {
+        String fullName = currentDir.equals("/") ? String.format("%s%s",currentDir,name):String.format("%s/%s",currentDir,name);
+        if (checkResourceExists(fullName, 0, type.ordinal())) {
             logger.error("resource {} has exist, can't recreate", name);
             putMsg(result, Status.RESOURCE_EXIST);
             return result;
@@ -131,7 +219,9 @@ public class ResourcesService extends BaseService {
 
         Date now = new Date();
 
-        Resource resource = new Resource(name,file.getOriginalFilename(),desc,loginUser.getId(),type,file.getSize(),now,now);
+
+
+        Resource resource = new Resource(pid,name,fullName,false,desc,file.getOriginalFilename(),loginUser.getId(),type,file.getSize(),now,now);
 
         try {
             resourcesMapper.insert(resource);
@@ -151,7 +241,7 @@ public class ResourcesService extends BaseService {
         }
 
         // fail upload
-        if (!upload(loginUser, name, file, type)) {
+        if (!upload(loginUser, fullName, file, type)) {
             logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename());
             putMsg(result, Status.HDFS_OPERATION_ERROR);
             throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
@@ -162,27 +252,29 @@ public class ResourcesService extends BaseService {
     /**
      * check resource is exists
      *
-     * @param alias     alias
+     * @param fullName  fullName
      * @param userId    user id
      * @param type      type
      * @return true if resource exists
      */
-    private boolean checkResourceExists(String alias, int userId, int type ){
-        List<Resource> resources = resourcesMapper.queryResourceList(alias, userId, type);
-        return CollectionUtils.isNotEmpty(resources);
-    }
+    private boolean checkResourceExists(String fullName, int userId, int type ){
 
+        List<Resource> resources = resourcesMapper.queryResourceList(fullName, userId, type);
+        if (resources != null && resources.size() > 0) {
+            return true;
+        }
+        return false;
+    }
 
 
     /**
      * update resource
-     *
-     * @param loginUser login user
-     * @param name alias
-     * @param resourceId resource id
-     * @param type resource type
-     * @param desc description
-     * @return update result code
+     * @param loginUser     login user
+     * @param resourceId    resource id
+     * @param name          name
+     * @param desc          description
+     * @param type          resource type
+     * @return  update result code
      */
     @Transactional(rollbackFor = Exception.class)
     public Result updateResource(User loginUser,
@@ -216,7 +308,10 @@ public class ResourcesService extends BaseService {
         }
 
         //check resource aleady exists
-        if (!resource.getAlias().equals(name) && checkResourceExists(name, 0, type.ordinal())) {
+        String originFullName = resource.getFullName();
+
+        String fullName = String.format("%s%s",originFullName.substring(0,originFullName.lastIndexOf("/")+1),name);
+        if (!resource.getAlias().equals(name) && checkResourceExists(fullName, 0, type.ordinal())) {
             logger.error("resource {} already exists, can't recreate", name);
             putMsg(result, Status.RESOURCE_EXIST);
             return result;
@@ -227,25 +322,41 @@ public class ResourcesService extends BaseService {
         if (StringUtils.isEmpty(tenantCode)){
             return result;
         }
-
-        //get the file suffix
+        String nameWithSuffix = name;
         String originResourceName = resource.getAlias();
-        String suffix = originResourceName.substring(originResourceName.lastIndexOf('.'));
+        if (!resource.isDirectory()) {
+            //get the file suffix
 
-        //if the name without suffix then add it ,else use the origin name
-        String nameWithSuffix = name;
-        if(!name.endsWith(suffix)){
-            nameWithSuffix = nameWithSuffix + suffix;
+            String suffix = originResourceName.substring(originResourceName.lastIndexOf("."));
+
+            //if the name without suffix then add it ,else use the origin name
+            if(!name.endsWith(suffix)){
+                nameWithSuffix = nameWithSuffix + suffix;
+            }
         }
 
         // updateResource data
+        List<Integer> childrenResource = listAllChildren(resource);
+        String oldFullName = resource.getFullName();
         Date now = new Date();
+
         resource.setAlias(nameWithSuffix);
+        resource.setFullName(fullName);
         resource.setDescription(desc);
         resource.setUpdateTime(now);
 
         try {
             resourcesMapper.updateById(resource);
+            if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) {
+                List<Resource> childResourceList = new ArrayList<>();
+                List<Resource> resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()]));
+                childResourceList = resourceList.stream().map(t -> {
+                    t.setFullName(t.getFullName().replaceFirst(oldFullName, fullName));
+                    t.setUpdateTime(now);
+                    return t;
+                }).collect(Collectors.toList());
+                resourcesMapper.batchUpdateResource(childResourceList);
+            }
 
             putMsg(result, Status.SUCCESS);
             Map<Object, Object> dataMap = new BeanMap(resource);
@@ -267,15 +378,9 @@ public class ResourcesService extends BaseService {
 
         // get file hdfs path
         // delete hdfs file by type
-        String originHdfsFileName = "";
-        String destHdfsFileName = "";
-        if (resource.getType().equals(ResourceType.FILE)) {
-            originHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, originResourceName);
-            destHdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, name);
-        } else if (resource.getType().equals(ResourceType.UDF)) {
-            originHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, originResourceName);
-            destHdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
-        }
+        String originHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,originFullName);
+        String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
+
         try {
             if (HadoopUtils.getInstance().exists(originHdfsFileName)) {
                 logger.info("hdfs copy {} -> {}", originHdfsFileName, destHdfsFileName);
@@ -303,7 +408,7 @@ public class ResourcesService extends BaseService {
      * @param pageSize page size
      * @return resource list page
      */
-    public Map<String, Object> queryResourceListPaging(User loginUser, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
+    public Map<String, Object> queryResourceListPaging(User loginUser, int direcotryId, ResourceType type, String searchVal, Integer pageNo, Integer pageSize) {
 
         HashMap<String, Object> result = new HashMap<>(5);
         Page<Resource> page = new Page(pageNo, pageSize);
@@ -312,7 +417,7 @@ public class ResourcesService extends BaseService {
             userId= 0;
         }
         IPage<Resource> resourceIPage = resourcesMapper.queryResourcePaging(page,
-                userId, type.ordinal(), searchVal);
+                userId,direcotryId, type.ordinal(), searchVal);
         PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
         pageInfo.setTotalCount((int)resourceIPage.getTotal());
         pageInfo.setLists(resourceIPage.getRecords());
@@ -322,16 +427,45 @@ public class ResourcesService extends BaseService {
     }
 
     /**
+     * create direcoty
+     * @param loginUser login user
+     * @param fullName  full name
+     * @param type      resource type
+     * @param result    Result
+     */
+    private void createDirecotry(User loginUser,String fullName,ResourceType type,Result result){
+        // query tenant
+        String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
+        String directoryName = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
+        String resourceRootPath = HadoopUtils.getHdfsDir(type,tenantCode);
+        try {
+            if (!HadoopUtils.getInstance().exists(resourceRootPath)) {
+                createTenantDirIfNotExists(tenantCode);
+            }
+
+            if (!HadoopUtils.getInstance().mkdir(directoryName)) {
+                logger.error("create resource directory {} of hdfs failed",directoryName);
+                putMsg(result,Status.HDFS_OPERATION_ERROR);
+                throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
+            }
+        } catch (Exception e) {
+            logger.error("create resource directory {} of hdfs failed",directoryName);
+            putMsg(result,Status.HDFS_OPERATION_ERROR);
+            throw new RuntimeException(String.format("create resource directory: %s failed.", directoryName));
+        }
+    }
+
+    /**
      * upload file to hdfs
      *
-     * @param loginUser
-     * @param name
-     * @param file
+     * @param loginUser login user
+     * @param fullName  full name
+     * @param file      file
      */
-    private boolean upload(User loginUser, String name, MultipartFile file, ResourceType type) {
+    private boolean upload(User loginUser, String fullName, MultipartFile file, ResourceType type) {
         // save to local
         String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
-        String nameSuffix = FileUtils.suffix(name);
+        String nameSuffix = FileUtils.suffix(fullName);
 
         // determine file suffix
         if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
@@ -344,15 +478,8 @@ public class ResourcesService extends BaseService {
 
 
         // save file to hdfs, and delete original file
-        String hdfsFilename = "";
-        String resourcePath = "";
-        if (type.equals(ResourceType.FILE)) {
-            hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name);
-            resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
-        } else if (type.equals(ResourceType.UDF)) {
-            hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
-            resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode);
-        }
+        String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
+        String resourcePath = HadoopUtils.getHdfsDir(type,tenantCode);
         try {
             // if tenant dir not exists
             if (!HadoopUtils.getInstance().exists(resourcePath)) {
@@ -377,13 +504,59 @@ public class ResourcesService extends BaseService {
     public Map<String, Object> queryResourceList(User loginUser, ResourceType type) {
 
         Map<String, Object> result = new HashMap<>(5);
-        List<Resource> resourceList;
+
+        Set<Resource> allResourceList = getAllResources(loginUser, type);
+        Visitor resourceTreeVisitor = new ResourceTreeVisitor(new ArrayList<>(allResourceList));
+        //JSONArray jsonArray = JSON.parseArray(JSON.toJSONString(resourceTreeVisitor.visit().getChildren(), SerializerFeature.SortField));
+        result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
+        putMsg(result,Status.SUCCESS);
+
+        return result;
+    }
+
+    /**
+     * get all resources
+     * @param loginUser     login user
+     * @return all resource set
+     */
+    private Set<Resource> getAllResources(User loginUser, ResourceType type) {
         int userId = loginUser.getId();
+        boolean listChildren = true;
         if(isAdmin(loginUser)){
             userId = 0;
+            listChildren = false;
+        }
+        List<Resource> resourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal());
+        Set<Resource> allResourceList = new HashSet<>(resourceList);
+        if (listChildren) {
+            Set<Integer> authorizedIds = new HashSet<>();
+            List<Resource> authorizedDirecoty = resourceList.stream().filter(t->t.getUserId() != loginUser.getId() && t.isDirectory()).collect(Collectors.toList());
+            if (CollectionUtils.isNotEmpty(authorizedDirecoty)) {
+                for(Resource resource : authorizedDirecoty){
+                    authorizedIds.addAll(listAllChildren(resource));
+                }
+                List<Resource> childrenResources = resourcesMapper.listResourceByIds(authorizedIds.toArray(new Integer[authorizedIds.size()]));
+                allResourceList.addAll(childrenResources);
+            }
         }
-        resourceList = resourcesMapper.queryResourceListAuthored(userId, type.ordinal());
-        result.put(Constants.DATA_LIST, resourceList);
+        return allResourceList;
+    }
+
+    /**
+     * query resource list
+     *
+     * @param loginUser login user
+     * @param type resource type
+     * @return resource list
+     */
+    public Map<String, Object> queryResourceJarList(User loginUser, ResourceType type) {
+
+        Map<String, Object> result = new HashMap<>(5);
+
+        Set<Resource> allResourceList = getAllResources(loginUser, type);
+        List<Resource> resources = new ResourceFilter(".jar",new ArrayList<>(allResourceList)).filter();
+        Visitor resourceTreeVisitor = new ResourceTreeVisitor(resources);
+        result.put(Constants.DATA_LIST, resourceTreeVisitor.visit().getChildren());
         putMsg(result,Status.SUCCESS);
 
         return result;
@@ -419,23 +592,51 @@ public class ResourcesService extends BaseService {
             putMsg(result, Status.USER_NO_OPERATION_PERM);
             return result;
         }
+        //if resource type is UDF,need check whether it is bound by UDF functon
+        if (resource.getType() == (ResourceType.UDF)) {
+            List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new int[]{resourceId});
+            if (CollectionUtils.isNotEmpty(udfFuncs)) {
+                logger.error("can't be deleted,because it is bound by UDF functions:{}",udfFuncs.toString());
+                putMsg(result,Status.UDF_RESOURCE_IS_BOUND,udfFuncs.get(0).getFuncName());
+                return result;
+            }
+        }
 
-        Tenant tenant = tenantMapper.queryById(loginUser.getTenantId());
-        if (tenant == null){
-            putMsg(result, Status.TENANT_NOT_EXIST);
+        String tenantCode = getTenantCode(resource.getUserId(),result);
+        if (StringUtils.isEmpty(tenantCode)){
+            return  result;
+        }
+
+        // get all resource id of process definitions those is released
+        Map<Integer, Set<Integer>> resourceProcessMap = getResourceProcessMap();
+        Set<Integer> resourceIdSet = resourceProcessMap.keySet();
+        // get all children of the resource
+        List<Integer> allChildren = listAllChildren(resource);
+
+        if (resourceIdSet.contains(resource.getPid())) {
+            logger.error("can't be deleted,because it is used of process definition");
+            putMsg(result, Status.RESOURCE_IS_USED);
+            return result;
+        }
+        resourceIdSet.retainAll(allChildren);
+        if (CollectionUtils.isNotEmpty(resourceIdSet)) {
+            logger.error("can't be deleted,because it is used of process definition");
+            for (Integer resId : resourceIdSet) {
+                logger.error("resource id:{} is used of process definition {}",resId,resourceProcessMap.get(resId));
+            }
+            putMsg(result, Status.RESOURCE_IS_USED);
             return result;
         }
-        String hdfsFilename = "";
 
-        // delete hdfs file by type
-        String tenantCode = tenant.getTenantCode();
-        hdfsFilename = getHdfsFileName(resource, tenantCode, hdfsFilename);
+        // get hdfs file by type
+        String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());
 
         //delete data in database
-        resourcesMapper.deleteById(resourceId);
+        resourcesMapper.deleteIds(allChildren.toArray(new Integer[allChildren.size()]));
         resourceUserMapper.deleteResourceUser(0, resourceId);
+
         //delete file on hdfs
-        HadoopUtils.getInstance().delete(hdfsFilename, false);
+        HadoopUtils.getInstance().delete(hdfsFilename, true);
         putMsg(result, Status.SUCCESS);
 
         return result;
@@ -444,15 +645,15 @@ public class ResourcesService extends BaseService {
     /**
      * verify resource by name and type
      * @param loginUser login user
-     * @param name resource alias
-     * @param type resource type
+     * @param fullName  resource full name
+     * @param type      resource type
      * @return true if the resource name not exists, otherwise return false
      */
-    public Result verifyResourceName(String name, ResourceType type,User loginUser) {
+    public Result verifyResourceName(String fullName, ResourceType type,User loginUser) {
         Result result = new Result();
         putMsg(result, Status.SUCCESS);
-        if (checkResourceExists(name, 0, type.ordinal())) {
-            logger.error("resource type:{} name:{} has exist, can't create again.", type, name);
+        if (checkResourceExists(fullName, 0, type.ordinal())) {
+            logger.error("resource type:{} name:{} has exist, can't create again.", type, fullName);
             putMsg(result, Status.RESOURCE_EXIST);
         } else {
             // query tenant
@@ -461,9 +662,9 @@ public class ResourcesService extends BaseService {
                 String tenantCode = tenant.getTenantCode();
 
                 try {
-                    String hdfsFilename = getHdfsFileName(type,tenantCode,name);
+                    String hdfsFilename = HadoopUtils.getHdfsFileName(type,tenantCode,fullName);
                     if(HadoopUtils.getInstance().exists(hdfsFilename)){
-                        logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, name,hdfsFilename);
+                        logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, fullName,hdfsFilename);
                         putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
                     }
 
@@ -481,6 +682,48 @@ public class ResourcesService extends BaseService {
     }
 
     /**
+     * verify resource by full name or pid and type
+     * @param fullName  resource full name
+     * @param id        resource id
+     * @param type      resource type
+     * @return true if the resource full name or pid not exists, otherwise return false
+     */
+    public Result queryResource(String fullName,Integer id,ResourceType type) {
+        Result result = new Result();
+        if (StringUtils.isBlank(fullName) && id == null) {
+            logger.error("You must input one of fullName and pid");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
+            return result;
+        }
+        if (StringUtils.isNotBlank(fullName)) {
+            List<Resource> resourceList = resourcesMapper.queryResource(fullName,type.ordinal());
+            if (CollectionUtils.isEmpty(resourceList)) {
+                logger.error("resource file not exist,  resource full name {} ", fullName);
+                putMsg(result, Status.RESOURCE_NOT_EXIST);
+                return result;
+            }
+            putMsg(result, Status.SUCCESS);
+            result.setData(resourceList.get(0));
+        } else {
+            Resource resource = resourcesMapper.selectById(id);
+            if (resource == null) {
+                logger.error("resource file not exist,  resource id {}", id);
+                putMsg(result, Status.RESOURCE_NOT_EXIST);
+                return result;
+            }
+            Resource parentResource = resourcesMapper.selectById(resource.getPid());
+            if (parentResource == null) {
+                logger.error("parent resource file not exist,  resource id {}", id);
+                putMsg(result, Status.RESOURCE_NOT_EXIST);
+                return result;
+            }
+            putMsg(result, Status.SUCCESS);
+            result.setData(parentResource);
+        }
+        return result;
+    }
+
+    /**
      * view resource file online
      *
      * @param resourceId resource id
@@ -501,7 +744,7 @@ public class ResourcesService extends BaseService {
         // get resource by id
         Resource resource = resourcesMapper.selectById(resourceId);
         if (resource == null) {
-            logger.error("resouce file not exist,  resource id {}", resourceId);
+            logger.error("resource file not exist,  resource id {}", resourceId);
             putMsg(result, Status.RESOURCE_NOT_EXIST);
             return result;
         }
@@ -511,7 +754,7 @@ public class ResourcesService extends BaseService {
         if (StringUtils.isNotEmpty(resourceViewSuffixs)) {
             List<String> strList = Arrays.asList(resourceViewSuffixs.split(","));
             if (!strList.contains(nameSuffix)) {
-                logger.error("resouce suffix {} not support view,  resource id {}", nameSuffix, resourceId);
+                logger.error("resource suffix {} not support view,  resource id {}", nameSuffix, resourceId);
                 putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
                 return result;
             }
@@ -523,7 +766,7 @@ public class ResourcesService extends BaseService {
         }
 
         // hdfs path
-        String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
+        String hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resource.getFullName());
         logger.info("resource hdfs path is {} ", hdfsFileName);
         try {
             if(HadoopUtils.getInstance().exists(hdfsFileName)){
@@ -559,7 +802,7 @@ public class ResourcesService extends BaseService {
      * @return create result code
      */
     @Transactional(rollbackFor = Exception.class)
-    public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content) {
+    public Result onlineCreateResource(User loginUser, ResourceType type, String fileName, String fileSuffix, String desc, String content,int pid,String currentDirectory) {
         Result result = new Result();
         // if resource upload startup
         if (!PropertyUtils.getResUploadStartupState()){
@@ -581,15 +824,16 @@ public class ResourcesService extends BaseService {
         }
 
         String name = fileName.trim() + "." + nameSuffix;
+        String fullName = currentDirectory.equals("/") ? String.format("%s%s",currentDirectory,name):String.format("%s/%s",currentDirectory,name);
 
-        result = verifyResourceName(name,type,loginUser);
+        result = verifyResourceName(fullName,type,loginUser);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
         // save data
         Date now = new Date();
-        Resource resource = new Resource(name,name,desc,loginUser.getId(),type,content.getBytes().length,now,now);
+        Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now);
 
         resourcesMapper.insert(resource);
 
@@ -605,7 +849,7 @@ public class ResourcesService extends BaseService {
 
         String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
 
-        result = uploadContentToHdfs(name, tenantCode, content);
+        result = uploadContentToHdfs(fullName, tenantCode, content);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             throw new RuntimeException(result.getMsg());
         }
@@ -657,7 +901,7 @@ public class ResourcesService extends BaseService {
         resourcesMapper.updateById(resource);
 
 
-        result = uploadContentToHdfs(resource.getAlias(), tenantCode, content);
+        result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             throw new RuntimeException(result.getMsg());
         }
@@ -665,10 +909,10 @@ public class ResourcesService extends BaseService {
     }
 
     /**
-     * @param resourceName
-     * @param tenantCode
-     * @param content
-     * @return
+     * @param resourceName  resource name
+     * @param tenantCode    tenant code
+     * @param content       content
+     * @return result
      */
     private Result uploadContentToHdfs(String resourceName, String tenantCode, String content) {
         Result result = new Result();
@@ -684,8 +928,8 @@ public class ResourcesService extends BaseService {
                 return result;
             }
 
-            // get file hdfs path
-            hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName);
+            // get resource file hdfs path
+            hdfsFileName = HadoopUtils.getHdfsResourceFileName(tenantCode, resourceName);
             String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
             logger.info("resource hdfs path is {} ", hdfsFileName);
 
@@ -729,11 +973,14 @@ public class ResourcesService extends BaseService {
             logger.error("download file not exist,  resource id {}", resourceId);
             return null;
         }
+        if (resource.isDirectory()) {
+            logger.error("resource id {} is directory,can't download it", resourceId);
+            throw new RuntimeException("cant't download directory");
+        }
         User user = userMapper.queryDetailsById(resource.getUserId());
         String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode();
 
-        String hdfsFileName = "";
-        hdfsFileName = getHdfsFileName(resource, tenantCode, hdfsFileName);
+        String hdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getAlias());
 
         String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
         logger.info("resource hdfs path is {} ", hdfsFileName);
@@ -744,6 +991,33 @@ public class ResourcesService extends BaseService {
 
 
     /**
+     * list all file
+     *
+     * @param loginUser login user
+     * @param userId user id
+     * @return unauthorized result code
+     */
+    public Map<String, Object> authorizeResourceTree(User loginUser, Integer userId) {
+
+        Map<String, Object> result = new HashMap<>();
+        if (checkAdmin(loginUser, result)) {
+            return result;
+        }
+        List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
+        List<ResourceComponent> list ;
+        if (CollectionUtils.isNotEmpty(resourceList)) {
+            Visitor visitor = new ResourceTreeVisitor(resourceList);
+            list = visitor.visit().getChildren();
+        }else {
+            list = new ArrayList<>(0);
+        }
+
+        result.put(Constants.DATA_LIST, list);
+        putMsg(result,Status.SUCCESS);
+        return result;
+    }
+
+    /**
      * unauthorized file
      *
      * @param loginUser login user
@@ -757,8 +1031,8 @@ public class ResourcesService extends BaseService {
             return result;
         }
         List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
-        List<Object> list ;
-        if (CollectionUtils.isNotEmpty(resourceList)) {
+        List<Resource> list ;
+        if (resourceList != null && resourceList.size() > 0) {
             Set<Resource> resourceSet = new HashSet<>(resourceList);
             List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);
 
@@ -767,15 +1041,12 @@ public class ResourcesService extends BaseService {
         }else {
             list = new ArrayList<>(0);
         }
-
-        result.put(Constants.DATA_LIST, list);
+        Visitor visitor = new ResourceTreeVisitor(list);
+        result.put(Constants.DATA_LIST, visitor.visit().getChildren());
         putMsg(result,Status.SUCCESS);
         return result;
     }
 
-
-
-
     /**
      * unauthorized udf function
      *
@@ -841,47 +1112,16 @@ public class ResourcesService extends BaseService {
             return result;
         }
         List<Resource> authedResources = resourcesMapper.queryAuthorizedResourceList(userId);
-
-        result.put(Constants.DATA_LIST, authedResources);
+        Visitor visitor = new ResourceTreeVisitor(authedResources);
+        logger.info(JSON.toJSONString(visitor.visit(), SerializerFeature.SortField));
+        String jsonTreeStr = JSON.toJSONString(visitor.visit().getChildren(), SerializerFeature.SortField);
+        logger.info(jsonTreeStr);
+        result.put(Constants.DATA_LIST, visitor.visit().getChildren());
         putMsg(result,Status.SUCCESS);
         return result;
     }
 
     /**
-     * get hdfs file name
-     *
-     * @param resource resource
-     * @param tenantCode tenant code
-     * @param hdfsFileName hdfs file name
-     * @return hdfs file name
-     */
-    private String getHdfsFileName(Resource resource, String tenantCode, String hdfsFileName) {
-        if (resource.getType().equals(ResourceType.FILE)) {
-            hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
-        } else if (resource.getType().equals(ResourceType.UDF)) {
-            hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, resource.getAlias());
-        }
-        return hdfsFileName;
-    }
-
-    /**
-     * get hdfs file name
-     *
-     * @param resourceType resource type
-     * @param tenantCode tenant code
-     * @param hdfsFileName hdfs file name
-     * @return hdfs file name
-     */
-    private String getHdfsFileName(ResourceType resourceType, String tenantCode, String hdfsFileName) {
-        if (resourceType.equals(ResourceType.FILE)) {
-            hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, hdfsFileName);
-        } else if (resourceType.equals(ResourceType.UDF)) {
-            hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, hdfsFileName);
-        }
-        return hdfsFileName;
-    }
-
-    /**
      * get authorized resource list
      *
      * @param resourceSet resource set
@@ -920,4 +1160,69 @@ public class ResourcesService extends BaseService {
         return tenant.getTenantCode();
     }
 
+    /**
+     * list all children id
+     * @param resource resource
+     * @return all children id
+     */
+    List<Integer> listAllChildren(Resource resource){
+        List<Integer> childList = new ArrayList<>();
+        if (resource.getId() != -1) {
+            childList.add(resource.getId());
+        }
+
+        if(resource.isDirectory()){
+            listAllChildren(resource.getId(),childList);
+        }
+        return childList;
+    }
+
+    /**
+     * list all children id
+     * @param resourceId    resource id
+     * @param childList     child list
+     */
+    void listAllChildren(int resourceId,List<Integer> childList){
+
+        List<Integer> children = resourcesMapper.listChildren(resourceId);
+        for(int chlidId:children){
+            childList.add(chlidId);
+            listAllChildren(chlidId,childList);
+        }
+    }
+
+    /**
+     * get resource process map key is resource id,value is the set of process definition
+     * @return resource process definition map
+     */
+    private Map<Integer,Set<Integer>> getResourceProcessMap(){
+        Map<Integer, String> map = new HashMap<>();
+        Map<Integer, Set<Integer>> result = new HashMap<>();
+        List<Map<String, Object>> list = processDefinitionMapper.listResources();
+        if (CollectionUtils.isNotEmpty(list)) {
+            for (Map<String, Object> tempMap : list) {
+
+                map.put((Integer) tempMap.get("id"), (String)tempMap.get("resource_ids"));
+            }
+        }
+
+        for (Map.Entry<Integer, String> entry : map.entrySet()) {
+            Integer mapKey = entry.getKey();
+            String[] arr = entry.getValue().split(",");
+            Set<Integer> mapValues = Arrays.stream(arr).map(Integer::parseInt).collect(Collectors.toSet());
+            for (Integer value : mapValues) {
+                if (result.containsKey(value)) {
+                    Set<Integer> set = result.get(value);
+                    set.add(mapKey);
+                    result.put(value, set);
+                } else {
+                    Set<Integer> set = new HashSet<>();
+                    set.add(mapKey);
+                    result.put(value, set);
+                }
+            }
+        }
+        return result;
+    }
+
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
index 249c7ec..8a0bf74 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
@@ -118,7 +118,7 @@ public class UdfFuncService extends BaseService{
         }
         udf.setDescription(desc);
         udf.setResourceId(resourceId);
-        udf.setResourceName(resource.getAlias());
+        udf.setResourceName(resource.getFullName());
         udf.setType(type);
 
         udf.setCreateTime(now);
@@ -226,7 +226,7 @@ public class UdfFuncService extends BaseService{
         }
         udf.setDescription(desc);
         udf.setResourceId(resourceId);
-        udf.setResourceName(resource.getAlias());
+        udf.setResourceName(resource.getFullName());
         udf.setType(type);
 
         udf.setUpdateTime(now);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
new file mode 100644
index 0000000..8a4a16c
--- /dev/null
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.dto.resources.filter;
+
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * resource filter test
+ */
+public class ResourceFilterTest {
+    private static Logger logger = LoggerFactory.getLogger(ResourceFilterTest.class);
+    @Test
+    public void filterTest(){
+        List<Resource> allList = new ArrayList<>();
+
+        Resource resource1 = new Resource(3,-1,"b","/b",true);
+        Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
+        Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
+        Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
+        Resource resource5 = new Resource(7,-1,"b2","/b2",true);
+        Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
+        Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
+        allList.add(resource1);
+        allList.add(resource2);
+        allList.add(resource3);
+        allList.add(resource4);
+        allList.add(resource5);
+        allList.add(resource6);
+        allList.add(resource7);
+
+
+        ResourceFilter resourceFilter = new ResourceFilter(".jar",allList);
+        List<Resource> resourceList = resourceFilter.filter();
+        Assert.assertNotNull(resourceList);
+        resourceList.stream().forEach(t-> logger.info(t.toString()));
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java
new file mode 100644
index 0000000..d1f8a12
--- /dev/null
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/visitor/ResourceTreeVisitorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.api.dto.resources.visitor;
+
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * resource tree visitor test
+ */
+public class ResourceTreeVisitorTest {
+
+    @Test
+    public void visit() throws Exception {
+        List<Resource> resourceList = new ArrayList<>();
+
+        Resource resource1 = new Resource(3,-1,"b","/b",true);
+        Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
+        Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
+        Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
+        Resource resource5 = new Resource(7,-1,"b2","/b2",true);
+        Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
+        Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
+        resourceList.add(resource1);
+        resourceList.add(resource2);
+        resourceList.add(resource3);
+        resourceList.add(resource4);
+        resourceList.add(resource5);
+        resourceList.add(resource6);
+        resourceList.add(resource7);
+
+        ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
+        ResourceComponent resourceComponent = resourceTreeVisitor.visit();
+        Assert.assertNotNull(resourceComponent.getChildren());
+    }
+
+    @Test
+    public void rootNode() throws Exception {
+        List<Resource> resourceList = new ArrayList<>();
+
+        Resource resource1 = new Resource(3,-1,"b","/b",true);
+        Resource resource2 = new Resource(4,2,"a1.txt","/a/a1.txt",false);
+        Resource resource3 = new Resource(5,3,"b1.txt","/b/b1.txt",false);
+        Resource resource4 = new Resource(6,3,"b2.jar","/b/b2.jar",false);
+        Resource resource5 = new Resource(7,-1,"b2","/b2",true);
+        Resource resource6 = new Resource(8,-1,"b2","/b/b2",true);
+        Resource resource7 = new Resource(9,8,"c2.jar","/b/b2/c2.jar",false);
+        resourceList.add(resource1);
+        resourceList.add(resource2);
+        resourceList.add(resource3);
+        resourceList.add(resource4);
+        resourceList.add(resource5);
+        resourceList.add(resource6);
+        resourceList.add(resource7);
+
+        ResourceTreeVisitor resourceTreeVisitor = new ResourceTreeVisitor(resourceList);
+        Assert.assertTrue(resourceTreeVisitor.rootNode(resource1));
+        Assert.assertTrue(resourceTreeVisitor.rootNode(resource2));
+        Assert.assertFalse(resourceTreeVisitor.rootNode(resource3));
+
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
index 6d07ebd..d73eba8 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
@@ -24,10 +24,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@@ -40,6 +37,7 @@ import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.omg.CORBA.Any;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -73,6 +71,8 @@ public class ResourcesServiceTest {
     private UserMapper userMapper;
     @Mock
     private UdfFuncMapper udfFunctionMapper;
+    @Mock
+    private ProcessDefinitionMapper processDefinitionMapper;
 
     @Before
     public void setUp() {
@@ -96,14 +96,14 @@ public class ResourcesServiceTest {
         PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
         User user = new User();
         //HDFS_NOT_STARTUP
-        Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
+        Result result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null,-1,"/");
         logger.info(result.toString());
         Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
 
         //RESOURCE_FILE_IS_EMPTY
         MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf",new String().getBytes());
         PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
-        result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile);
+        result = resourcesService.createResource(user,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/");
         logger.info(result.toString());
         Assert.assertEquals(Status.RESOURCE_FILE_IS_EMPTY.getMsg(),result.getMsg());
 
@@ -111,31 +111,42 @@ public class ResourcesServiceTest {
         mockMultipartFile = new MockMultipartFile("test.pdf","test.pdf","pdf",new String("test").getBytes());
         PowerMockito.when(FileUtils.suffix("test.pdf")).thenReturn("pdf");
         PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.jar")).thenReturn("jar");
-        result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile);
+        result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,mockMultipartFile,-1,"/");
         logger.info(result.toString());
         Assert.assertEquals(Status.RESOURCE_SUFFIX_FORBID_CHANGE.getMsg(),result.getMsg());
 
         //UDF_RESOURCE_SUFFIX_NOT_JAR
         mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.pdf","ResourcesServiceTest.pdf","pdf",new String("test").getBytes());
         PowerMockito.when(FileUtils.suffix("ResourcesServiceTest.pdf")).thenReturn("pdf");
-        result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
+        result = resourcesService.createResource(user,"ResourcesServiceTest.pdf","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile,-1,"/");
         logger.info(result.toString());
         Assert.assertEquals(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg(),result.getMsg());
 
 
-        //UDF_RESOURCE_SUFFIX_NOT_JAR
-        Mockito.when(tenantMapper.queryById(0)).thenReturn(getTenant());
-        Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(getResourceList());
-        mockMultipartFile = new MockMultipartFile("ResourcesServiceTest.jar","ResourcesServiceTest.jar","pdf",new String("test").getBytes());
-        result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
+    }
+
+    @Test
+    public void testCreateDirecotry(){
+
+        PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
+        User user = new User();
+        //HDFS_NOT_STARTUP
+        Result result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/");
         logger.info(result.toString());
-        Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
+        Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
 
-        //SUCCESS
-        Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest.jar", 0, 1)).thenReturn(new ArrayList<>());
-        result = resourcesService.createResource(user,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.UDF,mockMultipartFile);
+        //PARENT_RESOURCE_NOT_EXIST
+        PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
+        Mockito.when(resourcesMapper.selectById(Mockito.anyInt())).thenReturn(null);
+        result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,1,"/");
         logger.info(result.toString());
-        Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
+        Assert.assertEquals(Status.PARENT_RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
+        //RESOURCE_EXIST
+        PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
+        Mockito.when(resourcesMapper.queryResourceList("/directoryTest", 0, 0)).thenReturn(getResourceList());
+        result = resourcesService.createDirectory(user,"directoryTest","directory test",ResourceType.FILE,-1,"/");
+        logger.info(result.toString());
+        Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
 
     }
 
@@ -163,41 +174,46 @@ public class ResourcesServiceTest {
 
         //SUCCESS
         user.setId(1);
-        result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest.jar",ResourceType.FILE);
+        Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser());
+        Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
+
+        result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE);
         logger.info(result.toString());
         Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
 
         //RESOURCE_EXIST
-        Mockito.when(resourcesMapper.queryResourceList("ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
-        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.FILE);
+        Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList());
+        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE);
         logger.info(result.toString());
         Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
         //USER_NOT_EXIST
-        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
+        Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(null);
+        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
         logger.info(result.toString());
         Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode());
 
         //TENANT_NOT_EXIST
         Mockito.when(userMapper.queryDetailsById(1)).thenReturn(getUser());
-        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
+        Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null);
+        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
         logger.info(result.toString());
         Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg());
 
         //RESOURCE_NOT_EXIST
         Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
-        PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test1");
+        PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test1");
 
         try {
             Mockito.when(hadoopUtils.exists("test")).thenReturn(true);
         } catch (IOException e) {
             e.printStackTrace();
         }
-        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
+        result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
         logger.info(result.toString());
         Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
 
         //SUCCESS
-        PowerMockito.when(HadoopUtils.getHdfsFilename(Mockito.any(), Mockito.any())).thenReturn("test");
+        PowerMockito.when(HadoopUtils.getHdfsResourceFileName(Mockito.any(), Mockito.any())).thenReturn("test");
         result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
         logger.info(result.toString());
         Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
@@ -212,8 +228,8 @@ public class ResourcesServiceTest {
         resourcePage.setTotal(1);
         resourcePage.setRecords(getResourceList());
         Mockito.when(resourcesMapper.queryResourcePaging(Mockito.any(Page.class),
-                Mockito.eq(0), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage);
-        Map<String, Object> result = resourcesService.queryResourceListPaging(loginUser,ResourceType.FILE,"test",1,10);
+                Mockito.eq(0),Mockito.eq(-1), Mockito.eq(0), Mockito.eq("test"))).thenReturn(resourcePage);
+        Map<String, Object> result = resourcesService.queryResourceListPaging(loginUser,-1,ResourceType.FILE,"test",1,10);
         logger.info(result.toString());
         Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
         PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
@@ -263,6 +279,7 @@ public class ResourcesServiceTest {
             //TENANT_NOT_EXIST
             loginUser.setUserType(UserType.ADMIN_USER);
             loginUser.setTenantId(2);
+            Mockito.when(userMapper.queryDetailsById(Mockito.anyInt())).thenReturn(loginUser);
             result = resourcesService.delete(loginUser,1);
             logger.info(result.toString());
             Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg());
@@ -285,14 +302,20 @@ public class ResourcesServiceTest {
 
         User user = new User();
         user.setId(1);
-        Mockito.when(resourcesMapper.queryResourceList("test", 0, 0)).thenReturn(getResourceList());
-        Result result = resourcesService.verifyResourceName("test",ResourceType.FILE,user);
+        Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest.jar", 0, 0)).thenReturn(getResourceList());
+        Result result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user);
         logger.info(result.toString());
         Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(), result.getMsg());
 
         //TENANT_NOT_EXIST
         Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
-        result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user);
+        String unExistFullName = "/test.jar";
+        try {
+            Mockito.when(hadoopUtils.exists(unExistFullName)).thenReturn(false);
+        } catch (IOException e) {
+            logger.error("hadoop error",e);
+        }
+        result = resourcesService.verifyResourceName("/test.jar",ResourceType.FILE,user);
         logger.info(result.toString());
         Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(), result.getMsg());
 
@@ -304,10 +327,10 @@ public class ResourcesServiceTest {
         } catch (IOException e) {
             logger.error("hadoop error",e);
         }
-        PowerMockito.when(HadoopUtils.getHdfsFilename("123", "test1")).thenReturn("test");
-        result = resourcesService.verifyResourceName("test1",ResourceType.FILE,user);
+        PowerMockito.when(HadoopUtils.getHdfsResourceFileName("123", "test1")).thenReturn("test");
+        result = resourcesService.verifyResourceName("/ResourcesServiceTest.jar",ResourceType.FILE,user);
         logger.info(result.toString());
-        Assert.assertTrue(Status.RESOURCE_FILE_EXIST.getCode()==result.getCode());
+        Assert.assertTrue(Status.RESOURCE_EXIST.getCode()==result.getCode());
 
         //SUCCESS
         result = resourcesService.verifyResourceName("test2",ResourceType.FILE,user);
@@ -389,14 +412,14 @@ public class ResourcesServiceTest {
         PowerMockito.when(HadoopUtils.getHdfsUdfDir("udfDir")).thenReturn("udfDir");
         User user = getUser();
         //HDFS_NOT_STARTUP
-        Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
+        Result result = resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
         logger.info(result.toString());
         Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
 
         //RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
         PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
         PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("class");
-        result =  resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
+        result =  resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
         logger.info(result.toString());
         Assert.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(),result.getMsg());
 
@@ -404,7 +427,7 @@ public class ResourcesServiceTest {
         try {
             PowerMockito.when(FileUtils.getResourceViewSuffixs()).thenReturn("jar");
             Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
-            result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content");
+            result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "desc", "content",-1,"/");
         }catch (RuntimeException ex){
             logger.info(result.toString());
             Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), ex.getMessage());
@@ -413,7 +436,7 @@ public class ResourcesServiceTest {
         //SUCCESS
         Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
         PowerMockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
-        result =  resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content");
+        result =  resourcesService.onlineCreateResource(user,ResourceType.FILE,"test","jar","desc","content",-1,"/");
         logger.info(result.toString());
         Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
 
@@ -584,13 +607,26 @@ public class ResourcesServiceTest {
     private Resource getResource(){
 
         Resource resource = new Resource();
+        resource.setPid(-1);
         resource.setUserId(1);
         resource.setDescription("ResourcesServiceTest.jar");
         resource.setAlias("ResourcesServiceTest.jar");
+        resource.setFullName("/ResourcesServiceTest.jar");
         resource.setType(ResourceType.FILE);
         return resource;
     }
 
+    private Resource getUdfResource(){
+
+        Resource resource = new Resource();
+        resource.setUserId(1);
+        resource.setDescription("udfTest");
+        resource.setAlias("udfTest.jar");
+        resource.setFullName("/udfTest.jar");
+        resource.setType(ResourceType.UDF);
+        return resource;
+    }
+
     private UdfFunc getUdfFunc(){
 
         UdfFunc udfFunc = new UdfFunc();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
index 308ed8e..ccc231f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
@@ -43,6 +43,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Map;
 
 import static org.junit.Assert.*;
@@ -173,7 +174,11 @@ public class CheckUtilsTest {
         // MapreduceParameters
         MapreduceParameters mapreduceParameters = new MapreduceParameters();
         assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
-        mapreduceParameters.setMainJar(new ResourceInfo());
+
+        ResourceInfo resourceInfoMapreduce = new ResourceInfo();
+        resourceInfoMapreduce.setId(1);
+        resourceInfoMapreduce.setRes("");
+        mapreduceParameters.setMainJar(resourceInfoMapreduce);
         mapreduceParameters.setProgramType(ProgramType.JAVA);
         assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java
index 1c371e7..633f5f9 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java
@@ -23,13 +23,17 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
  */
 public enum AuthorizationType {
     /**
-     * 0 RESOURCE_FILE;
+     * 0 RESOURCE_FILE_ID;
+     * 0 RESOURCE_FILE_NAME;
+     * 1 UDF_FILE;
      * 1 DATASOURCE;
      * 2 UDF;
      */
-    RESOURCE_FILE(0, "resource file"),
-    DATASOURCE(1, "data source"),
-    UDF(2, "udf function");
+    RESOURCE_FILE_ID(0, "resource file id"),
+    RESOURCE_FILE_NAME(1, "resource file name"),
+    UDF_FILE(2, "udf file"),
+    DATASOURCE(3, "data source"),
+    UDF(4, "udf function");
 
     AuthorizationType(int code, String descp){
         this.code = code;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
index 3c95ac6..a7fc083 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
@@ -23,6 +23,16 @@ public class ResourceInfo {
   /**
    * res the name of the resource that was uploaded
    */
+  private int id;
+
+  public int getId() {
+    return id;
+  }
+
+  public void setId(int id) {
+    this.id = id;
+  }
+
   private String res;
 
   public String getRes() {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
index 2d0322a..ae78caf 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.common.task;
 
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -31,7 +32,7 @@ public abstract class AbstractParameters implements IParameters {
   public abstract boolean checkParameters();
 
   @Override
-  public abstract List<String> getResourceFilesList();
+  public abstract List<ResourceInfo> getResourceFilesList();
 
   /**
    * local parameters
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java
index 8fb49eb..88a2b54 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/IParameters.java
@@ -16,6 +16,8 @@
  */
 package org.apache.dolphinscheduler.common.task;
 
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+
 import java.util.List;
 
 /**
@@ -34,5 +36,5 @@ public interface IParameters {
      *
      * @return resource files list
      */
-    List<String> getResourceFilesList();
+    List<ResourceInfo> getResourceFilesList();
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java
index 5714b5e..7f0f2c8 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.conditions;
 
 import org.apache.dolphinscheduler.common.enums.DependentRelation;
 import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 
 import java.util.List;
@@ -41,7 +42,7 @@ public class ConditionsParameters extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
+    public List<ResourceInfo> getResourceFilesList() {
         return null;
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
index f153360..872b3aa 100755
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 
 /**
@@ -198,7 +199,7 @@ public class DataxParameters extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
+    public List<ResourceInfo> getResourceFilesList() {
         return new ArrayList<>();
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java
index 9ff1405..5f2e0e1 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/dependent/DependentParameters.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task.dependent;
 
 import org.apache.dolphinscheduler.common.enums.DependentRelation;
 import org.apache.dolphinscheduler.common.model.DependentTaskModel;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 
 import java.util.ArrayList;
@@ -36,7 +37,7 @@ public class DependentParameters extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
+    public List<ResourceInfo> getResourceFilesList() {
         return new ArrayList<>();
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
index 1fbd9ab..05cbb1d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
@@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.flink;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * spark parameters
@@ -50,35 +50,35 @@ public class FlinkParameters extends AbstractParameters {
   private String mainArgs;
 
   /**
-   * slot个数
+   * slot count
    */
   private int slot;
 
   /**
-   *Yarn application的名字
+   *Yarn application name
    */
 
   private String appName;
 
   /**
-   * taskManager 数量
+   * taskManager count
    */
   private int  taskManager;
 
   /**
-   * jobManagerMemory 内存大小
+   * job manager memory
    */
   private String  jobManagerMemory ;
 
   /**
-   * taskManagerMemory内存大小
+   * task manager memory
    */
   private String  taskManagerMemory;
 
   /**
    * resource list
    */
-  private List<ResourceInfo> resourceList;
+  private List<ResourceInfo> resourceList = new ArrayList<>();
 
   /**
    * The YARN queue to submit to
@@ -207,16 +207,11 @@ public class FlinkParameters extends AbstractParameters {
 
 
   @Override
-  public List<String> getResourceFilesList() {
-    if(resourceList != null ) {
-      List<String> resourceFiles = resourceList.stream()
-              .map(ResourceInfo::getRes).collect(Collectors.toList());
-      if(mainJar != null) {
-        resourceFiles.add(mainJar.getRes());
-      }
-      return resourceFiles;
+  public List<ResourceInfo> getResourceFilesList() {
+    if (mainJar != null && !resourceList.contains(mainJar)) {
+      resourceList.add(mainJar);
     }
-    return Collections.emptyList();
+    return resourceList;
   }
 
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
index 00b01af..54284bd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.task.http;
 import org.apache.dolphinscheduler.common.enums.HttpCheckCondition;
 import org.apache.dolphinscheduler.common.enums.HttpMethod;
 import org.apache.dolphinscheduler.common.process.HttpProperty;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.commons.lang.StringUtils;
 
@@ -62,7 +63,7 @@ public class HttpParameters extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
+    public List<ResourceInfo> getResourceFilesList() {
         return new ArrayList<>();
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java
index 31c9c72..5126e82 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/mr/MapreduceParameters.java
@@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.mr;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class MapreduceParameters extends AbstractParameters {
 
@@ -54,7 +54,7 @@ public class MapreduceParameters extends AbstractParameters {
     /**
      * resource list
      */
-    private List<ResourceInfo> resourceList;
+    private List<ResourceInfo> resourceList = new ArrayList<>();
 
     /**
      * program type
@@ -125,16 +125,12 @@ public class MapreduceParameters extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
-        if(resourceList != null ) {
-            List<String> resourceFiles = resourceList.stream()
-                    .map(ResourceInfo::getRes).collect(Collectors.toList());
-            if(mainJar != null) {
-                resourceFiles.add(mainJar.getRes());
-            }
-            return resourceFiles;
+    public List<ResourceInfo> getResourceFilesList() {
+        if (mainJar != null && !resourceList.contains(mainJar)) {
+            resourceList.add(mainJar);
         }
-        return Collections.emptyList();
+
+        return resourceList;
     }
 
     @Override
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
index 56ae655..2811f10 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dolphinscheduler.common.task.procedure;
 
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.commons.lang.StringUtils;
 
@@ -74,7 +75,7 @@ public class ProcedureParameters extends AbstractParameters {
   }
 
   @Override
-  public List<String> getResourceFilesList() {
+  public List<ResourceInfo> getResourceFilesList() {
     return new ArrayList<>();
   }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java
index ae9cb4c..35dbd8e 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/python/PythonParameters.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class PythonParameters extends AbstractParameters {
   /**
@@ -56,12 +55,7 @@ public class PythonParameters extends AbstractParameters {
   }
 
   @Override
-  public List<String> getResourceFilesList() {
-    if (resourceList != null) {
-      return resourceList.stream()
-              .map(p -> p.getRes()).collect(Collectors.toList());
-    }
-
-    return null;
+  public List<ResourceInfo> getResourceFilesList() {
+    return this.resourceList;
   }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
index 85b8acb..e11e596 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
@@ -59,12 +59,7 @@ public class ShellParameters extends AbstractParameters {
   }
 
   @Override
-  public List<String> getResourceFilesList() {
-    if (resourceList != null) {
-      return resourceList.stream()
-              .map(p -> p.getRes()).collect(Collectors.toList());
-    }
-
-    return null;
+  public List<ResourceInfo> getResourceFilesList() {
+    return resourceList;
   }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
index 74982d5..4e58201 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java
@@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.common.task.spark;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * spark parameters
@@ -78,7 +78,7 @@ public class SparkParameters extends AbstractParameters {
   /**
    * resource list
    */
-  private List<ResourceInfo> resourceList;
+  private List<ResourceInfo> resourceList = new ArrayList<>();
 
   /**
    * The YARN queue to submit to
@@ -219,18 +219,12 @@ public class SparkParameters extends AbstractParameters {
     return mainJar != null && programType != null && sparkVersion != null;
   }
 
-
   @Override
-  public List<String> getResourceFilesList() {
-    if(resourceList !=null ) {
-      List<String> resourceFilesList = resourceList.stream()
-              .map(ResourceInfo::getRes).collect(Collectors.toList());
-      if(mainJar != null){
-          resourceFilesList.add(mainJar.getRes());
-      }
-      return resourceFilesList;
+  public List<ResourceInfo> getResourceFilesList() {
+    if (mainJar != null && !resourceList.contains(mainJar)) {
+      resourceList.add(mainJar);
     }
-    return Collections.emptyList();
+    return resourceList;
   }
 
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
index d65204a..4604234 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dolphinscheduler.common.task.sql;
 
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.commons.lang.StringUtils;
 
@@ -189,7 +190,7 @@ public class SqlParameters extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
+    public List<ResourceInfo> getResourceFilesList() {
         return new ArrayList<>();
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
index fb65df6..7f02f42 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dolphinscheduler.common.task.sqoop;
 
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 
@@ -111,7 +112,7 @@ public class SqoopParameters  extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
+    public List<ResourceInfo> getResourceFilesList() {
        return new ArrayList<>();
     }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java
index c7784de..46f0e85 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.dolphinscheduler.common.task.subprocess;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 
 import java.util.ArrayList;
@@ -42,7 +43,7 @@ public class SubProcessParameters extends AbstractParameters {
     }
 
     @Override
-    public List<String> getResourceFilesList() {
+    public List<ResourceInfo> getResourceFilesList() {
         return new ArrayList<>();
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 6c42704..c89f3c8 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -26,6 +26,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONException;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.io.IOUtils;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
@@ -421,6 +422,22 @@ public class HadoopUtils implements Closeable {
      * @param tenantCode tenant code
      * @return hdfs resource dir
      */
+    public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
+        String hdfsDir = "";
+        if (resourceType.equals(ResourceType.FILE)) {
+            hdfsDir = getHdfsResDir(tenantCode);
+        } else if (resourceType.equals(ResourceType.UDF)) {
+            hdfsDir = getHdfsUdfDir(tenantCode);
+        }
+        return hdfsDir;
+    }
+
+    /**
+     * hdfs resource dir
+     *
+     * @param tenantCode tenant code
+     * @return hdfs resource dir
+     */
     public static String getHdfsResDir(String tenantCode) {
         return String.format("%s/resources", getHdfsTenantDir(tenantCode));
     }
@@ -450,22 +467,42 @@ public class HadoopUtils implements Closeable {
      * get absolute path and name for file on hdfs
      *
      * @param tenantCode tenant code
-     * @param filename   file name
+     * @param fileName   file name
+     * @return get absolute path and name for file on hdfs
+     */
+
+    /**
+     * get hdfs file name
+     *
+     * @param resourceType  resource type
+     * @param tenantCode    tenant code
+     * @param fileName      file name
+     * @return hdfs file name
+     */
+    public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
+        return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
+    }
+
+    /**
+     * get absolute path and name for resource file on hdfs
+     *
+     * @param tenantCode tenant code
+     * @param fileName   file name
      * @return get absolute path and name for file on hdfs
      */
-    public static String getHdfsFilename(String tenantCode, String filename) {
-        return String.format("%s/%s", getHdfsResDir(tenantCode), filename);
+    public static String getHdfsResourceFileName(String tenantCode, String fileName) {
+        return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
     }
 
     /**
      * get absolute path and name for udf file on hdfs
      *
      * @param tenantCode tenant code
-     * @param filename   file name
+     * @param fileName   file name
      * @return get absolute path and name for udf file on hdfs
      */
-    public static String getHdfsUdfFilename(String tenantCode, String filename) {
-        return String.format("%s/%s", getHdfsUdfDir(tenantCode), filename);
+    public static String getHdfsUdfFileName(String tenantCode, String fileName) {
+        return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
     }
 
     /**
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java
index 7ce00e8..cd7b4f2 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/FlinkParametersTest.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.task;
 
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,8 +29,7 @@ public class FlinkParametersTest {
     @Test
     public void getResourceFilesList() {
         FlinkParameters flinkParameters = new FlinkParameters();
-        Assert.assertNotNull(flinkParameters.getResourceFilesList());
-        Assert.assertTrue(flinkParameters.getResourceFilesList().isEmpty());
+        Assert.assertTrue(CollectionUtils.isEmpty(flinkParameters.getResourceFilesList()));
 
         ResourceInfo mainResource = new ResourceInfo();
         mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar");
@@ -41,15 +41,17 @@ public class FlinkParametersTest {
         resourceInfos.add(resourceInfo1);
 
         flinkParameters.setResourceList(resourceInfos);
-        Assert.assertNotNull(flinkParameters.getResourceFilesList());
-        Assert.assertEquals(2, flinkParameters.getResourceFilesList().size());
+        List<ResourceInfo> resourceFilesList = flinkParameters.getResourceFilesList();
+        Assert.assertNotNull(resourceFilesList);
+        Assert.assertEquals(2, resourceFilesList.size());
 
         ResourceInfo resourceInfo2 = new ResourceInfo();
         resourceInfo2.setRes("testFlinkParameters2.jar");
         resourceInfos.add(resourceInfo2);
 
         flinkParameters.setResourceList(resourceInfos);
-        Assert.assertNotNull(flinkParameters.getResourceFilesList());
-        Assert.assertEquals(3, flinkParameters.getResourceFilesList().size());
+        resourceFilesList = flinkParameters.getResourceFilesList();
+        Assert.assertNotNull(resourceFilesList);
+        Assert.assertEquals(3, resourceFilesList.size());
     }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index f59d11f..e29de89 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -163,6 +163,11 @@ public class ProcessDefinition {
      */
     private String modifyBy;
 
+    /**
+     * resource ids
+     */
+    private String resourceIds;
+
 
     public String getName() {
         return name;
@@ -334,6 +339,14 @@ public class ProcessDefinition {
         this.scheduleReleaseState = scheduleReleaseState;
     }
 
+    public String getResourceIds() {
+        return resourceIds;
+    }
+
+    public void setResourceIds(String resourceIds) {
+        this.resourceIds = resourceIds;
+    }
+
     public int getTimeout() {
         return timeout;
     }
@@ -393,6 +406,8 @@ public class ProcessDefinition {
                 ", timeout=" + timeout +
                 ", tenantId=" + tenantId +
                 ", modifyBy='" + modifyBy + '\'' +
+                ", resourceIds='" + resourceIds + '\'' +
                 '}';
     }
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java
index 934be4b..16d9491 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java
@@ -33,11 +33,26 @@ public class Resource {
   private int id;
 
   /**
+   * parent id
+   */
+  private int pid;
+
+  /**
    * resource alias
    */
   private String alias;
 
   /**
+   * full name
+   */
+  private String fullName;
+
+  /**
+   * is directory
+   */
+  private boolean isDirectory=false;
+
+  /**
    * description
    */
   private String description;
@@ -89,7 +104,15 @@ public class Resource {
     this.updateTime = updateTime;
   }
 
-  public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
+  public Resource(int id, int pid, String alias, String fullName, boolean isDirectory) {
+    this.id = id;
+    this.pid = pid;
+    this.alias = alias;
+    this.fullName = fullName;
+    this.isDirectory = isDirectory;
+  }
+
+  /*public Resource(String alias, String fileName, String description, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
     this.alias = alias;
     this.fileName = fileName;
     this.description = description;
@@ -98,6 +121,20 @@ public class Resource {
     this.size = size;
     this.createTime = createTime;
     this.updateTime = updateTime;
+  }*/
+
+  public Resource(int pid, String alias, String fullName, boolean isDirectory, String description, String fileName, int userId, ResourceType type, long size, Date createTime, Date updateTime) {
+    this.pid = pid;
+    this.alias = alias;
+    this.fullName = fullName;
+    this.isDirectory = isDirectory;
+    this.description = description;
+    this.fileName = fileName;
+    this.userId = userId;
+    this.type = type;
+    this.size = size;
+    this.createTime = createTime;
+    this.updateTime = updateTime;
   }
 
   public int getId() {
@@ -116,6 +153,30 @@ public class Resource {
     this.alias = alias;
   }
 
+  public int getPid() {
+    return pid;
+  }
+
+  public void setPid(int pid) {
+    this.pid = pid;
+  }
+
+  public String getFullName() {
+    return fullName;
+  }
+
+  public void setFullName(String fullName) {
+    this.fullName = fullName;
+  }
+
+  public boolean isDirectory() {
+    return isDirectory;
+  }
+
+  public void setDirectory(boolean directory) {
+    isDirectory = directory;
+  }
+
   public String getFileName() {
     return fileName;
   }
@@ -177,9 +238,12 @@ public class Resource {
   public String toString() {
     return "Resource{" +
             "id=" + id +
+            ", pid=" + pid +
             ", alias='" + alias + '\'' +
-            ", fileName='" + fileName + '\'' +
+            ", fullName='" + fullName + '\'' +
+            ", isDirectory=" + isDirectory +
             ", description='" + description + '\'' +
+            ", fileName='" + fileName + '\'' +
             ", userId=" + userId +
             ", type=" + type +
             ", size=" + size +
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index 9f9225c..b75bb58 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -20,9 +20,11 @@ import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
+import org.apache.ibatis.annotations.MapKey;
 import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * process definition mapper interface
@@ -83,7 +85,7 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
     List<ProcessDefinition> queryDefinitionListByTenant(@Param("tenantId") int tenantId);
 
     /**
-     *  count process definition group by user
+     * count process definition group by user
      * @param userId userId
      * @param projectIds projectIds
      * @param isAdmin isAdmin
@@ -93,4 +95,11 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
             @Param("userId") Integer userId,
             @Param("projectIds") Integer[] projectIds,
             @Param("isAdmin") boolean isAdmin);
+
+    /**
+     * list all resource ids
+     * @return resource ids list
+     */
+    @MapKey("id")
+    List<Map<String, Object>> listResources();
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java
index cf65e5d..f07a92c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java
@@ -30,12 +30,12 @@ public interface ResourceMapper extends BaseMapper<Resource> {
 
     /**
      * query resource list
-     * @param alias alias
+     * @param fullName full name
      * @param userId userId
      * @param type type
      * @return resource list
      */
-    List<Resource> queryResourceList(@Param("alias") String alias,
+    List<Resource> queryResourceList(@Param("fullName") String fullName,
                                      @Param("userId") int userId,
                                      @Param("type") int type);
 
@@ -59,6 +59,7 @@ public interface ResourceMapper extends BaseMapper<Resource> {
      */
     IPage<Resource> queryResourcePaging(IPage<Resource> page,
                                         @Param("userId") int userId,
+                                        @Param("id") int id,
                                         @Param("type") int type,
                                         @Param("searchVal") String searchVal);
 
@@ -76,13 +77,13 @@ public interface ResourceMapper extends BaseMapper<Resource> {
      */
     List<Resource> queryResourceExceptUserId(@Param("userId") int userId);
 
-
     /**
      * query tenant code by name
      * @param resName resource name
+     * @param resType resource type
      * @return tenant code
      */
-    String queryTenantCodeByResourceName(@Param("resName") String resName);
+    String queryTenantCodeByResourceName(@Param("resName") String resName,@Param("resType") int resType);
 
     /**
      * list authorized resource
@@ -91,4 +92,48 @@ public interface ResourceMapper extends BaseMapper<Resource> {
      * @return resource list
      */
     <T> List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames);
+
+    /**
+     * list authorized resource
+     * @param userId userId
+     * @param resIds resource ids
+     * @return resource list
+     */
+    <T> List<Resource> listAuthorizedResourceById(@Param("userId") int userId,@Param("resIds")T[] resIds);
+
+    /**
+     * delete resource by id array
+     * @param resIds resource id array
+     * @return delete num
+     */
+    int deleteIds(@Param("resIds")Integer[] resIds);
+
+    /**
+     * list children
+     * @param direcotyId directory id
+     * @return resource id array
+     */
+    List<Integer> listChildren(@Param("direcotyId") int direcotyId);
+
+    /**
+     * query resource by full name or pid
+     * @param fullName  full name
+     * @param type      resource type
+     * @return resource
+     */
+    List<Resource> queryResource(@Param("fullName") String fullName,@Param("type") int type);
+
+    /**
+     * list resource by id array
+     * @param resIds resource id array
+     * @return resource list
+     */
+    List<Resource> listResourceByIds(@Param("resIds")Integer[] resIds);
+
+    /**
+     * update resource
+     * @param resourceList  resource list
+     * @return update num
+     */
+    int batchUpdateResource(@Param("resourceList") List<Resource> resourceList);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
index 5a87342..2411c9b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
@@ -86,4 +86,19 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
      */
     <T> List<UdfFunc> listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds);
 
+    /**
+     * list UDF by resource id
+     * @param   resourceIds  resource id array
+     * @return  UDF function list
+     */
+    List<UdfFunc> listUdfByResourceId(@Param("resourceIds") int[] resourceIds);
+
+    /**
+     * list authorized UDF by resource id
+     * @param   resourceIds  resource id array
+     * @return  UDF function list
+     */
+    List<UdfFunc> listAuthorizedUdfByResourceId(@Param("userId") int userId,@Param("resourceIds") int[] resourceIds);
+
+
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index f215778..c9086b9 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -87,4 +87,11 @@
             pd.user_id = u.id AND pd.project_id = p.id
         AND pd.id = #{processDefineId}
     </select>
+
+
+    <select id="listResources" resultType="java.util.HashMap">
+        SELECT id,resource_ids
+        FROM t_ds_process_definition
+        WHERE release_state = 1 and resource_ids is not null and resource_ids != ''
+    </select>
 </mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
index 2146d1a..c1fe50f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml
@@ -22,8 +22,8 @@
         select *
         from t_ds_resources
         where 1= 1
-        <if test="alias != null and alias != ''">
-            and alias = #{alias}
+        <if test="fullName != null and fullName != ''">
+            and full_name = #{fullName}
         </if>
         <if test="type != -1">
             and type = #{type}
@@ -47,8 +47,8 @@
     <select id="queryResourcePaging" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
         select *
         from t_ds_resources
-        where type=#{type}
-        <if test="userId != 0">
+        where type=#{type} and pid=#{id}
+        <if test="userId != 0 and id == -1">
             and id in (select resources_id from  t_ds_relation_resources_user where user_id=#{userId}
             union select id as resources_id  from t_ds_resources where user_id=#{userId})
         </if>
@@ -70,7 +70,74 @@
     <select id="queryTenantCodeByResourceName" resultType="java.lang.String">
         select tenant_code
         from t_ds_tenant t, t_ds_user u, t_ds_resources res
-        where t.id = u.tenant_id and u.id = res.user_id and res.type=0
-        and res.alias= #{resName}
+        where t.id = u.tenant_id and u.id = res.user_id and res.type=#{resType}
+        and res.full_name= #{resName}
+    </select>
+    <select id="listAuthorizedResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
+        select *
+        from t_ds_resources
+        where type=0
+        and id in (select resources_id from  t_ds_relation_resources_user where user_id=#{userId}
+        union select id as resources_id  from t_ds_resources where user_id=#{userId})
+        <if test="resNames != null and resNames != ''">
+            and full_name in
+            <foreach collection="resNames" item="i" open="(" close=")" separator=",">
+                #{i}
+            </foreach>
+        </if>
+    </select>
+    <select id="listAuthorizedResourceById" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
+        select *
+        from t_ds_resources
+        where id in (select resources_id from  t_ds_relation_resources_user where user_id=#{userId}
+        union select id as resources_id  from t_ds_resources where user_id=#{userId})
+        <if test="resIds != null and resIds != ''">
+            and id in
+            <foreach collection="resIds" item="i" open="(" close=")" separator=",">
+                #{i}
+            </foreach>
+        </if>
+    </select>
+    
+    <delete id="deleteIds" parameterType="java.lang.Integer">
+        delete from t_ds_resources where id in
+        <foreach collection="resIds" item="i" open="(" close=")" separator=",">
+            #{i}
+        </foreach>
+    </delete>
+
+    <select id="listChildren" resultType="java.lang.Integer">
+        select id
+        from t_ds_resources
+        where pid = #{direcotyId}
+    </select>
+
+    <select id="queryResource" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
+        select *
+        from t_ds_resources
+        where type = #{type}
+        and full_name = #{fullName}
+    </select>
+
+    <update id="batchUpdateResource" parameterType="java.util.List">
+        <foreach collection="resourceList" item="resource" index="index" open="" close="" separator =";">
+            update t_ds_resources
+            <set>
+                full_name=#{resource.fullName},
+                update_time=#{resource.updateTime}
+            </set>
+            <where>
+                id=#{resource.id}
+            </where>
+        </foreach>
+    </update>
+
+    <select id="listResourceByIds" resultType="org.apache.dolphinscheduler.dao.entity.Resource">
+        select *
+        from t_ds_resources
+        where id in
+        <foreach collection="resIds" item="i" open="(" close=")" separator=",">
+            #{i}
+        </foreach>
     </select>
 </mapper>
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
index 0aa1060..e38d163 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
@@ -87,4 +87,28 @@
             </foreach>
         </if>
     </select>
+    <select id="listUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
+        select *
+        from t_ds_udfs
+        where 1=1
+        <if test="resourceIds != null and resourceIds != ''">
+            and resource_id in
+            <foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
+                #{i}
+            </foreach>
+        </if>
+    </select>
+    <select id="listAuthorizedUdfByResourceId" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
+        select *
+        from t_ds_udfs
+        where
+        id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
+        union select id as udf_id  from t_ds_udfs where user_id=#{userId})
+        <if test="resourceIds != null and resourceIds != ''">
+            and resource_id in
+            <foreach collection="resourceIds" item="i" open="(" close=")" separator=",">
+                #{i}
+            </foreach>
+        </if>
+    </select>
 </mapper>
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
index 0108241..6a2aea5 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
@@ -34,6 +34,7 @@ import org.springframework.test.annotation.Rollback;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
@@ -68,7 +69,10 @@ public class ResourceMapperTest {
     private Resource insertOne(){
         //insertOne
         Resource resource = new Resource();
-        resource.setAlias("ut resource");
+        resource.setAlias("ut-resource");
+        resource.setFullName("/ut-resource");
+        resource.setPid(-1);
+        resource.setDirectory(false);
         resource.setType(ResourceType.FILE);
         resource.setUserId(111);
         resourceMapper.insert(resource);
@@ -80,17 +84,33 @@ public class ResourceMapperTest {
      * @param user user
      * @return Resource
      */
-    private Resource createResource(User user){
+    private Resource createResource(User user,boolean isDirectory,ResourceType resourceType,int pid,String alias,String fullName){
         //insertOne
         Resource resource = new Resource();
-        resource.setAlias(String.format("ut resource %s",user.getUserName()));
-        resource.setType(ResourceType.FILE);
+        resource.setDirectory(isDirectory);
+        resource.setType(resourceType);
+        resource.setAlias(alias);
+        resource.setFullName(fullName);
         resource.setUserId(user.getId());
         resourceMapper.insert(resource);
         return resource;
     }
 
     /**
+     * create resource by user
+     * @param user user
+     * @return Resource
+     */
+    private Resource createResource(User user){
+        //insertOne
+        String alias = String.format("ut-resource-%s",user.getUserName());
+        String fullName = String.format("/%s",alias);
+
+        Resource resource = createResource(user, false, ResourceType.FILE, -1, alias, fullName);
+        return resource;
+    }
+
+    /**
      * create user
      * @return User
      */
@@ -200,13 +220,15 @@ public class ResourceMapperTest {
 
         IPage<Resource> resourceIPage = resourceMapper.queryResourcePaging(
                 page,
-                resource.getUserId(),
+                0,
+                -1,
                 resource.getType().ordinal(),
                 ""
         );
         IPage<Resource> resourceIPage1 = resourceMapper.queryResourcePaging(
                 page,
                 1110,
+                -1,
                 resource.getType().ordinal(),
                 ""
         );
@@ -289,7 +311,7 @@ public class ResourceMapperTest {
         resourceMapper.updateById(resource);
 
         String resource1 = resourceMapper.queryTenantCodeByResourceName(
-                resource.getAlias()
+                resource.getFullName(),ResourceType.FILE.ordinal()
         );
 
 
@@ -305,22 +327,37 @@ public class ResourceMapperTest {
         User generalUser2 = createGeneralUser("user2");
         // create one resource
         Resource resource = createResource(generalUser2);
-        Resource unauthorizedResource = createResource(generalUser2);
+        Resource unauthorizedResource = createResource(generalUser1);
 
         // need download resources
-        String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()};
+        String[] resNames = new String[]{resource.getFullName(), unauthorizedResource.getFullName()};
 
         List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
 
         Assert.assertEquals(generalUser2.getId(),resource.getUserId());
-        Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
+        Assert.assertFalse(resources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames)));
 
 
 
         // authorize object unauthorizedResource to generalUser
         createResourcesUser(unauthorizedResource,generalUser2);
         List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
-        Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
+        Assert.assertTrue(authorizedResources.stream().map(t -> t.getFullName()).collect(toList()).containsAll(Arrays.asList(resNames)));
+
+    }
+
+    @Test
+    public void deleteIdsTest(){
+        // create a general user
+        User generalUser1 = createGeneralUser("user1");
+
+        Resource resource = createResource(generalUser1);
+        Resource resource1 = createResource(generalUser1);
 
+        List<Integer> resourceList = new ArrayList<>();
+        resourceList.add(resource.getId());
+        resourceList.add(resource1.getId());
+        int result = resourceMapper.deleteIds(resourceList.toArray(new Integer[resourceList.size()]));
+        Assert.assertEquals(result,2);
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index 48048e7..c7806f1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -23,15 +23,18 @@ import com.alibaba.fastjson.JSON;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
@@ -96,7 +99,7 @@ public class TaskScheduleThread implements Runnable {
             TaskNode taskNode = JSON.parseObject(taskInstance.getTaskJson(), TaskNode.class);
 
             // get resource files
-            List<String> resourceFiles = createProjectResFiles(taskNode);
+            List<ResourceInfo> resourceFiles = createProjectResFiles(taskNode);
             // copy hdfs/minio file to local
             downloadResource(
                     taskInstance.getExecutePath(),
@@ -165,6 +168,7 @@ public class TaskScheduleThread implements Runnable {
                 new Date(),
                 taskInstance.getId());
     }
+
     /**
      * get global paras map
      * @return
@@ -289,14 +293,16 @@ public class TaskScheduleThread implements Runnable {
     /**
      *  create project resource files
      */
-    private List<String> createProjectResFiles(TaskNode taskNode) throws Exception{
+    private List<ResourceInfo> createProjectResFiles(TaskNode taskNode) throws Exception{
 
-        Set<String> projectFiles = new HashSet<>();
+        Set<ResourceInfo> projectFiles = new HashSet<>();
         AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
 
         if (baseParam != null) {
-            List<String> projectResourceFiles = baseParam.getResourceFilesList();
-            projectFiles.addAll(projectResourceFiles);
+            List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
+            if (projectResourceFiles != null) {
+                projectFiles.addAll(projectResourceFiles);
+            }
         }
 
         return new ArrayList<>(projectFiles);
@@ -309,18 +315,25 @@ public class TaskScheduleThread implements Runnable {
      * @param projectRes
      * @param logger
      */
-    private void downloadResource(String execLocalPath, List<String> projectRes, Logger logger) throws Exception {
+    private void downloadResource(String execLocalPath, List<ResourceInfo> projectRes, Logger logger) throws Exception {
         checkDownloadPermission(projectRes);
-        for (String res : projectRes) {
-            File resFile = new File(execLocalPath, res);
+        String resourceName;
+        for (ResourceInfo res : projectRes) {
+            if (res.getId() != 0) {
+                Resource resource = processService.getResourceById(res.getId());
+                resourceName = resource.getFullName();
+            }else{
+                resourceName = res.getRes();
+            }
+            File resFile = new File(execLocalPath, resourceName);
             if (!resFile.exists()) {
                 try {
                     // query the tenant code of the resource according to the name of the resource
-                    String tentnCode = processService.queryTenantCodeByResName(res);
-                    String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode, res);
+                    String tentnCode = processService.queryTenantCodeByResName(resourceName, ResourceType.FILE);
+                    String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tentnCode, resourceName);
 
                     logger.info("get resource file from hdfs :{}", resHdfsPath);
-                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + res, false, true);
+                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resourceName, false, true);
                 }catch (Exception e){
                     logger.error(e.getMessage(),e);
                     throw new RuntimeException(e.getMessage());
@@ -336,10 +349,17 @@ public class TaskScheduleThread implements Runnable {
      * @param projectRes resource name list
      * @throws Exception exception
      */
-    private void checkDownloadPermission(List<String> projectRes) throws Exception {
+    private void checkDownloadPermission(List<ResourceInfo> projectRes) throws Exception {
+
         int userId = taskInstance.getProcessInstance().getExecutorId();
-        String[] resNames = projectRes.toArray(new String[projectRes.size()]);
-        PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
-        permissionCheck.checkPermission();
+        if (projectRes.stream().allMatch(t->t.getId() == 0)) {
+            String[] resNames = projectRes.stream().map(t -> t.getRes()).collect(Collectors.toList()).toArray(new String[projectRes.size()]);
+            PermissionCheck<String> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_NAME,processService,resNames,userId,logger);
+            permissionCheck.checkPermission();
+        }else{
+            Integer[] resIds = projectRes.stream().map(t -> t.getId()).collect(Collectors.toList()).toArray(new Integer[projectRes.size()]);
+            PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resIds,userId,logger);
+            permissionCheck.checkPermission();
+        }
     }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
index 39f4dfb..cda12ca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
@@ -94,4 +94,9 @@ public abstract class AbstractYarnTask extends AbstractTask {
    * @throws Exception exception
    */
   protected abstract String buildCommand() throws Exception;
+
+  /**
+   * set main jar name
+   */
+  protected abstract void setMainJarName();
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index c562fbe..0dc7c6a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -17,12 +17,14 @@
 package org.apache.dolphinscheduler.server.worker.task.flink;
 
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
@@ -63,6 +65,7 @@ public class FlinkTask extends AbstractYarnTask {
     if (!flinkParameters.checkParameters()) {
       throw new RuntimeException("flink task params is not valid");
     }
+    setMainJarName();
     flinkParameters.setQueue(taskProps.getQueue());
 
     if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
@@ -112,6 +115,28 @@ public class FlinkTask extends AbstractYarnTask {
   }
 
   @Override
+  protected void setMainJarName() {
+    // main jar
+    ResourceInfo mainJar = flinkParameters.getMainJar();
+    if (mainJar != null) {
+      int resourceId = mainJar.getId();
+      String resourceName;
+      if (resourceId == 0) {
+        resourceName = mainJar.getRes();
+      } else {
+        Resource resource = processService.getResourceById(flinkParameters.getMainJar().getId());
+        if (resource == null) {
+          logger.error("resource id: {} not exist", resourceId);
+          throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
+        }
+        resourceName = resource.getFullName().replaceFirst("/", "");
+      }
+      mainJar.setRes(resourceName);
+      flinkParameters.setMainJar(mainJar);
+    }
+  }
+
+  @Override
   public AbstractParameters getParameters() {
     return flinkParameters;
   }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index 7f6baad..0909fbd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.server.worker.task.mr;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@@ -64,7 +66,7 @@ public class MapReduceTask extends AbstractYarnTask {
         if (!mapreduceParameters.checkParameters()) {
             throw new RuntimeException("mapreduce task params is not valid");
         }
-
+        setMainJarName();
         mapreduceParameters.setQueue(taskProps.getQueue());
 
         // replace placeholder
@@ -100,6 +102,28 @@ public class MapReduceTask extends AbstractYarnTask {
     }
 
     @Override
+    protected void setMainJarName() {
+        // main jar
+        ResourceInfo mainJar = mapreduceParameters.getMainJar();
+        if (mainJar != null) {
+            int resourceId = mainJar.getId();
+            String resourceName;
+            if (resourceId == 0) {
+                resourceName = mainJar.getRes();
+            } else {
+                Resource resource = processService.getResourceById(mapreduceParameters.getMainJar().getId());
+                if (resource == null) {
+                    logger.error("resource id: {} not exist", resourceId);
+                    throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
+                }
+                resourceName = resource.getFullName().replaceFirst("/", "");
+            }
+            mainJar.setRes(resourceName);
+            mapreduceParameters.setMainJar(mainJar);
+        }
+    }
+
+    @Override
     public AbstractParameters getParameters() {
         return mapreduceParameters;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index 203c0fe..d2a8674 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -18,11 +18,13 @@ package org.apache.dolphinscheduler.server.worker.task.spark;
 
 import org.apache.dolphinscheduler.common.enums.SparkVersion;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
@@ -67,8 +69,8 @@ public class SparkTask extends AbstractYarnTask {
     if (!sparkParameters.checkParameters()) {
       throw new RuntimeException("spark task params is not valid");
     }
+    setMainJarName();
     sparkParameters.setQueue(taskProps.getQueue());
-
     if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
       String args = sparkParameters.getMainArgs();
 
@@ -116,6 +118,28 @@ public class SparkTask extends AbstractYarnTask {
   }
 
   @Override
+  protected void setMainJarName() {
+    // main jar
+    ResourceInfo mainJar = sparkParameters.getMainJar();
+    if (mainJar != null) {
+      int resourceId = mainJar.getId();
+      String resourceName;
+      if (resourceId == 0) {
+        resourceName = mainJar.getRes();
+      } else {
+        Resource resource = processService.getResourceById(sparkParameters.getMainJar().getId());
+        if (resource == null) {
+          logger.error("resource id: {} not exist", resourceId);
+          throw new RuntimeException(String.format("resource id: %d not exist", resourceId));
+        }
+        resourceName = resource.getFullName().replaceFirst("/", "");
+      }
+      mainJar.setRes(resourceName);
+      sparkParameters.setMainJar(mainJar);
+    }
+  }
+
+  @Override
   public AbstractParameters getParameters() {
     return sparkParameters;
   }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 64bc792..5c701dc 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -72,6 +72,10 @@ public class SqoopTask extends AbstractYarnTask {
     }
 
     @Override
+    protected void setMainJarName() {
+    }
+
+    @Override
     public AbstractParameters getParameters() {
         return sqoopParameters;
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
index e53fae6..9f93f4c 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.permission;
 
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -46,6 +47,11 @@ public class PermissionCheck<T> {
     private T[] needChecks;
 
     /**
+     * resoruce info
+     */
+    private List<ResourceInfo> resourceList;
+
+    /**
      * user id
      */
     private int userId;
@@ -90,6 +96,22 @@ public class PermissionCheck<T> {
         this.logger = logger;
     }
 
+    /**
+     * permission check
+     * @param logger
+     * @param authorizationType
+     * @param processService
+     * @param resourceList
+     * @param userId
+     */
+    public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId,Logger logger) {
+        this.authorizationType = authorizationType;
+        this.processService = processService;
+        this.resourceList = resourceList;
+        this.userId = userId;
+        this.logger = logger;
+    }
+
     public AuthorizationType getAuthorizationType() {
         return authorizationType;
     }
@@ -122,6 +144,14 @@ public class PermissionCheck<T> {
         this.userId = userId;
     }
 
+    public List<ResourceInfo> getResourceList() {
+        return resourceList;
+    }
+
+    public void setResourceList(List<ResourceInfo> resourceList) {
+        this.resourceList = resourceList;
+    }
+
     /**
      * has permission
      * @return true if has permission
@@ -141,6 +171,7 @@ public class PermissionCheck<T> {
      */
     public void checkPermission() throws Exception{
         if(this.needChecks.length > 0){
+
             // get user type in order to judge whether the user is admin
             User user = processService.getUserById(userId);
             if (user.getUserType() != UserType.ADMIN_USER){
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c848ec5..3312c10 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1556,10 +1556,11 @@ public class ProcessService {
     /**
      * find tenant code by resource name
      * @param resName resource name
+     * @param resourceType resource type
      * @return tenant code
      */
-    public String queryTenantCodeByResName(String resName){
-        return resourceMapper.queryTenantCodeByResourceName(resName);
+    public String queryTenantCodeByResName(String resName,ResourceType resourceType){
+        return resourceMapper.queryTenantCodeByResourceName(resName,resourceType.ordinal());
     }
 
     /**
@@ -1791,10 +1792,18 @@ public class ProcessService {
             Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
 
             switch (authorizationType){
-                case RESOURCE_FILE:
-                    Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet());
+                case RESOURCE_FILE_ID:
+                    Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
+                    originResSet.removeAll(authorizedResourceFiles);
+                    break;
+                case RESOURCE_FILE_NAME:
+                    Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet());
                     originResSet.removeAll(authorizedResources);
                     break;
+                case UDF_FILE:
+                    Set<Integer> authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
+                    originResSet.removeAll(authorizedUdfFiles);
+                    break;
                 case DATASOURCE:
                     Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet());
                     originResSet.removeAll(authorizedDatasources);
@@ -1820,5 +1829,14 @@ public class ProcessService {
         return userMapper.queryDetailsById(userId);
     }
 
+    /**
+     * get resource by resoruce id
+     * @param resoruceId resource id
+     * @return Resource
+     */
+    public Resource getResourceById(int resoruceId){
+        return resourceMapper.selectById(resoruceId);
+    }
+
 
 }
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
index 9412a8c..6a17239 100755
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
@@ -724,7 +724,7 @@ JSP.prototype.handleEvent = function () {
     } else {
       $(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))+1)
     }
-    
+
     // Storage node dependency information
     saveTargetarr(sourceId, targetId)
 
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/_source/common.js
old mode 100644
new mode 100755
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue
old mode 100644
new mode 100755
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue
index b082f88..ad33503 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/rename.vue
@@ -116,4 +116,4 @@
     },
     components: { mPopup, mListBoxF }
   }
-</script>
\ No newline at end of file
+</script>
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue
old mode 100644
new mode 100755
diff --git a/dolphinscheduler-ui/src/js/conf/home/store/resource/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/resource/actions.js
old mode 100644
new mode 100755
diff --git a/dolphinscheduler-ui/src/js/module/components/fileUpdate/fileUpdate.vue b/dolphinscheduler-ui/src/js/module/components/fileUpdate/fileUpdate.vue
old mode 100644
new mode 100755
diff --git a/sql/soft_version b/sql/soft_version
index 867e524..d2d61a7 100644
--- a/sql/soft_version
+++ b/sql/soft_version
@@ -1 +1 @@
-1.2.0
\ No newline at end of file
+1.2.2
\ No newline at end of file
diff --git a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
index 049484c..f960d5c 100644
--- a/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
@@ -74,4 +74,84 @@ d//
 
 delimiter ;
 CALL uc_dolphin_T_t_ds_task_instance_C_app_link;
-DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link;
\ No newline at end of file
+DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_C_app_link;
+
+-- ac_dolphin_T_t_ds_resources_A_pid
+drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_pid;
+delimiter d//
+CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_pid()
+   BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_resources'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='pid')
+   THEN
+         ALTER TABLE t_ds_resources ADD `pid` int(11) DEFAULT -1 COMMENT 'parent id';
+       END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL ac_dolphin_T_t_ds_resources_A_pid;
+DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_pid;
+
+-- ac_dolphin_T_t_ds_resources_A_full_name
+drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_A_full_name;
+delimiter d//
+CREATE PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name()
+   BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_resources'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='full_name')
+   THEN
+         ALTER TABLE t_ds_resources ADD `full_name` varchar(255) DEFAULT NULL COMMENT 'full name';
+       END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL ac_dolphin_T_t_ds_resources_A_full_name;
+DROP PROCEDURE ac_dolphin_T_t_ds_resources_A_full_name;
+
+-- ac_dolphin_T_t_ds_resources_A_pid
+drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_resources_is_directory;
+delimiter d//
+CREATE PROCEDURE ac_dolphin_T_t_ds_resources_is_directory()
+   BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_resources'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='is_directory')
+   THEN
+         ALTER TABLE t_ds_resources ADD `is_directory` tinyint(1) DEFAULT 0 COMMENT 'is directory';
+       END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL ac_dolphin_T_t_ds_resources_is_directory;
+DROP PROCEDURE ac_dolphin_T_t_ds_resources_is_directory;
+
+-- ac_dolphin_T_t_ds_process_definition_A_resource_ids
+drop PROCEDURE if EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids;
+delimiter d//
+CREATE PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids()
+   BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_process_definition'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='resource_ids')
+   THEN
+         ALTER TABLE t_ds_process_definition ADD `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource ids';
+       END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL ac_dolphin_T_t_ds_process_definition_A_resource_ids;
+DROP PROCEDURE ac_dolphin_T_t_ds_process_definition_A_resource_ids;
\ No newline at end of file
diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
index b1e0fd9..9b5f15b 100644
--- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
@@ -66,4 +66,87 @@ d//
 
 delimiter ;
 SELECT uc_dolphin_T_t_ds_task_instance_C_app_link();
-DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link();
\ No newline at end of file
+DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_C_app_link();
+
+
+-- ac_dolphin_T_t_ds_resources_A_pid
+delimiter d//
+CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_pid() RETURNS void AS $$
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+          WHERE TABLE_CATALOG=current_database()
+          AND TABLE_SCHEMA=current_schema()
+          AND TABLE_NAME='t_ds_resources'
+          AND COLUMN_NAME ='pid')
+      THEN
+         ALTER TABLE t_ds_resources ADD COLUMN pid int DEFAULT -1;
+       END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+delimiter ;
+select ac_dolphin_T_t_ds_resources_A_pid();
+DROP FUNCTION ac_dolphin_T_t_ds_resources_A_pid();
+
+-- ac_dolphin_T_t_ds_resources_A_full_name
+delimiter ;
+DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_full_name();
+delimiter d//
+CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_full_name() RETURNS void AS $$
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+          WHERE TABLE_CATALOG=current_database()
+          AND TABLE_SCHEMA=current_schema()
+          AND TABLE_NAME='t_ds_resources'
+          AND COLUMN_NAME ='full_name')
+      THEN
+         ALTER TABLE t_ds_resources ADD COLUMN full_name varchar(255) DEFAULT null;
+       END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+delimiter ;
+select ac_dolphin_T_t_ds_resources_A_full_name();
+DROP FUNCTION ac_dolphin_T_t_ds_resources_A_full_name();
+
+-- ac_dolphin_T_t_ds_resources_A_is_directory
+delimiter ;
+DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_resources_A_is_directory();
+delimiter d//
+CREATE FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory() RETURNS void AS $$
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+          WHERE TABLE_CATALOG=current_database()
+          AND TABLE_SCHEMA=current_schema()
+          AND TABLE_NAME='t_ds_resources'
+          AND COLUMN_NAME ='is_directory')
+      THEN
+         ALTER TABLE t_ds_resources ADD COLUMN is_directory boolean DEFAULT false;
+       END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+delimiter ;
+select ac_dolphin_T_t_ds_resources_A_is_directory();
+DROP FUNCTION ac_dolphin_T_t_ds_resources_A_is_directory();
+
+-- ac_dolphin_T_t_ds_process_definition_A_resource_ids
+delimiter ;
+DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_definition_A_resource_ids();
+delimiter d//
+CREATE FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids() RETURNS void AS $$
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+          WHERE TABLE_CATALOG=current_database()
+          AND TABLE_SCHEMA=current_schema()
+          AND TABLE_NAME='t_ds_process_definition'
+          AND COLUMN_NAME ='resource_ids')
+      THEN
+         ALTER TABLE t_ds_process_definition ADD COLUMN resource_ids varchar(255) DEFAULT null;
+       END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+delimiter ;
+select ac_dolphin_T_t_ds_process_definition_A_resource_ids();
+DROP FUNCTION ac_dolphin_T_t_ds_process_definition_A_resource_ids();
diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
index 38964cc..f892fa8 100644
--- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
+++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
@@ -13,4 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
\ No newline at end of file
+*/
+UPDATE t_ds_resources SET pid=-1,is_directory=false WHERE pid IS NULL;
+UPDATE t_ds_resources SET full_name = concat('/',alias) WHERE pid=-1 and full_name IS NULL;
\ No newline at end of file