You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/10/15 11:05:54 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #12076: [Feature-10495][Resource Center] Resource Center Refactor

caishunfeng commented on code in PR #12076:
URL: https://github.com/apache/dolphinscheduler/pull/12076#discussion_r996289630


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -355,7 +326,13 @@ private void updateParentResourceSize(Resource resource, long size) {
      * @return true if resource exists
      */
     private boolean checkResourceExists(String fullName, int type) {
-        Boolean existResource = resourcesMapper.existResource(fullName, type);
+        // Boolean existResource = resourcesMapper.existResource(fullName, type);
+        Boolean existResource = false;
+        try {
+            existResource = storageOperate.exists(fullName);
+        } catch (IOException e) {
+            logger.error("error occurred when checking resource: " + fullName);

Review Comment:
   ```suggestion
               logger.error("error occurred when checking resource: " + fullName, e);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -1808,4 +1929,44 @@ private List<Resource> queryResourceList(Integer userId, int perm) {
     private AuthorizationType checkResourceType(ResourceType type) {
         return type.equals(ResourceType.FILE) ? AuthorizationType.RESOURCE_FILE_ID : AuthorizationType.UDF_FILE;
     }
+
+    /**
+     * check permission by comparing login user's tenantCode with tenantCode in the request
+     *
+     * @param loginUser user who currently logs in
+     * @param resTenantCode tenantCode in the request field "resTenantCode", can be different from the login user in the case of admin users.
+     * @param result  result Object containing different cases
+     * @return tenantCode
+     */
+    private String getTenantCodeIfuserValid(User loginUser, String resTenantCode, Result<Object> result) {

Review Comment:
   Please don't use the `Result<Object> result` as input param, we should avoid to change the input param.
   You can throw the exception like `throw new ServiceException(Status.USER_NOT_EXIST)` when some check error.



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -272,46 +254,34 @@ public Result<Object> createResource(User loginUser,
         }
 
         // check resource name exists
-        String fullName = getFullName(currentDir, name);
-        if (checkResourceExists(fullName, type.ordinal())) {
-            logger.warn("Resource exists, can not create again, fullName:{}.", RegexUtils.escapeNRT(name));
-            putMsg(result, Status.RESOURCE_EXIST);
-            return result;
-        }
-        if (fullName.length() > Constants.RESOURCE_FULL_NAME_MAX_LENGTH) {
-            logger.warn("Resource file's name is longer than max full name length, fullName:{}, fullNameSize:{}, maxFullNameSize:{}",
-                    RegexUtils.escapeNRT(name), fullName.length(), Constants.RESOURCE_FULL_NAME_MAX_LENGTH);
-            putMsg(result, Status.RESOURCE_FULL_NAME_TOO_LONG_ERROR);
-            return result;
-        }
-
-        Date now = new Date();
-        Resource resource = new Resource(pid, name, fullName, false, desc, file.getOriginalFilename(),
-                loginUser.getId(), type, file.getSize(), now, now);
+        String userResRootPath = ResourceType.UDF.equals(type) ? storageOperate.getUdfDir(tenantCode)
+                : storageOperate.getResDir(tenantCode);
+        String currDirNFileName = !currentDir.contains(userResRootPath) ? userResRootPath + name : currentDir + name;
 
         try {
-            resourcesMapper.insert(resource);
-            updateParentResourceSize(resource, resource.getSize());
-            putMsg(result, Status.SUCCESS);
-            permissionPostHandle(resource.getType(), loginUser, resource.getId());
-            Map<String, Object> resultMap = new HashMap<>();
-            for (Map.Entry<Object, Object> entry : new BeanMap(resource).entrySet()) {
-                if (!"class".equalsIgnoreCase(entry.getKey().toString())) {
-                    resultMap.put(entry.getKey().toString(), entry.getValue());
-                }
+            if (checkResourceExists(currDirNFileName, type.ordinal())) {
+                logger.error("resource {} has exist, can't recreate", RegexUtils.escapeNRT(name));
+                putMsg(result, Status.RESOURCE_EXIST);
+                return result;
             }
-            result.setData(resultMap);
         } catch (Exception e) {
-            logger.warn("Resource exists, can not create again, fullName:{}.", fullName, e);
             throw new ServiceException("resource already exists, can't recreate");
         }
+        if (currDirNFileName.length() > Constants.RESOURCE_FULL_NAME_MAX_LENGTH) {
+            logger.warn(

Review Comment:
   ```suggestion
               logger.error(
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -506,63 +441,90 @@ public Result<Object> updateResource(User loginUser,
             resource.setSize(file.getSize());
         }
 
-        try {
-            resourcesMapper.updateById(resource);
-            if (resource.isDirectory()) {
-                List<Integer> childrenResource = listAllChildren(resource, false);
-                if (CollectionUtils.isNotEmpty(childrenResource)) {
-                    String matcherFullName = Matcher.quoteReplacement(fullName);
-                    List<Resource> childResourceList;
-                    Integer[] childResIdArray = childrenResource.toArray(new Integer[childrenResource.size()]);
-                    List<Resource> resourceList = resourcesMapper.listResourceByIds(childResIdArray);
-                    childResourceList = resourceList.stream().map(t -> {
-                        t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName));
-                        t.setUpdateTime(now);
-                        return t;
-                    }).collect(Collectors.toList());
-                    resourcesMapper.batchUpdateResource(childResourceList);
-
-                    if (ResourceType.UDF.equals(resource.getType())) {
-                        List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(childResIdArray);
-                        if (CollectionUtils.isNotEmpty(udfFuncs)) {
-                            udfFuncs = udfFuncs.stream().map(t -> {
-                                t.setResourceName(t.getResourceName().replaceFirst(originFullName, matcherFullName));
-                                t.setUpdateTime(now);
-                                return t;
-                            }).collect(Collectors.toList());
-                            udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
-                        }
-                    }
-                }
-            } else if (ResourceType.UDF.equals(resource.getType())) {
-                List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(new Integer[]{resourceId});
-                if (CollectionUtils.isNotEmpty(udfFuncs)) {
-                    udfFuncs = udfFuncs.stream().map(t -> {
-                        t.setResourceName(fullName);
-                        t.setUpdateTime(now);
-                        return t;
-                    }).collect(Collectors.toList());
-                    udfFunctionMapper.batchUpdateUdfFunc(udfFuncs);
-                }
+        // if name unchanged, return directly without moving on HDFS
+        if (originResourceName.equals(name) && file == null) {
+            return result;
+        }
 
+        List<ResourcesTask> existResourcesList;
+        if (resource.isDirectory()) {
+            existResourcesList = resourceTaskMapper.selectSubfoldersFullNames(originFullName + FOLDER_SEPARATOR);
+        } else {
+            existResourcesList = resourceTaskMapper.selectByMap(
+                    Collections.singletonMap("full_name", originFullName));
+        }
+
+        if (existResourcesList.size() > 0 && !fullName.equals(originFullName)) {
+            // check if any related task is online. If it is, it can not be updated.
+            for (ResourcesTask existResource : existResourcesList) {
+                int taskId = existResource.getTaskId();
+                if (processService.isTaskOnline(taskDefinitionMapper.selectById(taskId).getCode())) {
+                    logger.error("can't be updated,because it is used of process definition that's online");
+                    logger.error("resource task relation id:{} is used of task code {}", existResource.getId(),
+                            taskDefinitionMapper.selectById(taskId).getCode());
+                    putMsg(result, Status.RESOURCE_IS_USED);
+                    return result;
+                }
             }
 
-            putMsg(result, Status.SUCCESS);
-            Map<String, Object> resultMap = new HashMap<>();
-            for (Map.Entry<Object, Object> entry : new BeanMap(resource).entrySet()) {
-                if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
-                    resultMap.put(entry.getKey().toString(), entry.getValue());
+            for (ResourcesTask existResource : existResourcesList) {
+                int taskId = existResource.getTaskId();
+                long taskCode = taskDefinitionMapper.selectById(taskId).getCode();
+
+                List<ProcessTaskRelation> processTaskRelation = processTaskRelationMapper.selectByMap(
+                        Collections.singletonMap("post_task_code", taskCode));
+                if (processTaskRelation.size() > 0) {
+                    long processDefinitionCode = processTaskRelation.get(0).getProcessDefinitionCode();
+                    int processDefinitionVersion = processTaskRelation.get(0).getProcessDefinitionVersion();
+                    List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryByProcessCode(
+                            processTaskRelation.get(0).getProjectCode(),
+                            processDefinitionCode);
+
+                    List<TaskDefinition> taskDefinitionLogList = new ArrayList<>();
+
+                    if (taskRelationList.size() > 0) {
+                        ProcessDefinitionLog processDefinition =
+                                processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
+                                        processDefinitionCode, processDefinitionVersion);
+                        for (ProcessTaskRelation taskRelation : taskRelationList) {
+                            long taskCodeInProcess = taskRelation.getPostTaskCode();
+                            TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCodeInProcess);
+                            if (taskCodeInProcess == taskCode) {
+                                // originFullName is a prefix if isDirectory is true
+                                taskDefinition.setTaskParams(RemoveResourceFromResourceList(originFullName,
+                                        taskDefinition.getTaskParams(),
+                                        resource.isDirectory()));
+                                // if isDirectory is true, fullName is the new prefix. we replace old prefix
+                                // of resource fullname with the new prefix.
+                                // if isDirectory is false, fullName is the new path.
+                                taskDefinition.setTaskParams(AddResourceToResourceList(originFullName,
+                                        fullName,
+                                        existResource.getFullName(),
+                                        taskDefinition.getTaskParams(),
+                                        resource.isDirectory()));

Review Comment:
   Why call `taskDefinition.setTaskParams` twice?



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -374,38 +351,38 @@ private boolean checkResourceExists(String fullName, int type) {
     @Transactional
     public Result<Object> updateResource(User loginUser,
                                          int resourceId,
+                                         String resourceFullName,
+                                         String resTenantCode,
                                          String name,
                                          String desc,
                                          ResourceType type,
                                          MultipartFile file) {
         Result<Object> result = new Result<>();
-        String funcPermissionKey = type.equals(ResourceType.FILE) ? ApiFuncIdentificationConstant.FILE_UPDATE
-                : ApiFuncIdentificationConstant.UDF_UPDATE;
-        boolean canOperatorPermissions =
-                canOperatorPermissions(loginUser, new Object[]{resourceId}, checkResourceType(type), funcPermissionKey);
-        if (!canOperatorPermissions) {
-            putMsg(result, Status.NO_CURRENT_OPERATING_PERMISSION);
-            return result;
-        }
+
         result = checkResourceUploadStartupState();
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
-        Resource resource = resourcesMapper.selectById(resourceId);
-        if (resource == null) {
-            logger.error("Resource does not exist, resourceId:{}.", resourceId);
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
+        String tenantCode = getTenantCodeIfuserValid(loginUser, resTenantCode, result);
+        if (tenantCode == null) {
             return result;
         }
-        if (checkDescriptionLength(desc)) {
-            logger.warn("Parameter description is too long.");
-            putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
-            return result;
+
+        String defaultPath = storageOperate.getResDir(tenantCode);
+
+        StorageEntity resource;
+        try {
+            resource = storageOperate.getFileStatus(resourceFullName, defaultPath, resTenantCode, type);
+        } catch (Exception e) {
+            logger.error(e.getMessage() + " Resource path: {}", resourceFullName, e);

Review Comment:
   ```suggestion
               logger.error("Get file status fail, resource path: {}", resourceFullName, e);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -374,38 +351,38 @@ private boolean checkResourceExists(String fullName, int type) {
     @Transactional
     public Result<Object> updateResource(User loginUser,
                                          int resourceId,
+                                         String resourceFullName,
+                                         String resTenantCode,
                                          String name,
                                          String desc,
                                          ResourceType type,
                                          MultipartFile file) {
         Result<Object> result = new Result<>();
-        String funcPermissionKey = type.equals(ResourceType.FILE) ? ApiFuncIdentificationConstant.FILE_UPDATE
-                : ApiFuncIdentificationConstant.UDF_UPDATE;
-        boolean canOperatorPermissions =
-                canOperatorPermissions(loginUser, new Object[]{resourceId}, checkResourceType(type), funcPermissionKey);
-        if (!canOperatorPermissions) {
-            putMsg(result, Status.NO_CURRENT_OPERATING_PERMISSION);
-            return result;
-        }
+
         result = checkResourceUploadStartupState();
         if (!result.getCode().equals(Status.SUCCESS.getCode())) {
             return result;
         }
 
-        Resource resource = resourcesMapper.selectById(resourceId);
-        if (resource == null) {
-            logger.error("Resource does not exist, resourceId:{}.", resourceId);
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
+        String tenantCode = getTenantCodeIfuserValid(loginUser, resTenantCode, result);
+        if (tenantCode == null) {
             return result;
         }
-        if (checkDescriptionLength(desc)) {
-            logger.warn("Parameter description is too long.");
-            putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
-            return result;
+
+        String defaultPath = storageOperate.getResDir(tenantCode);
+
+        StorageEntity resource;
+        try {
+            resource = storageOperate.getFileStatus(resourceFullName, defaultPath, resTenantCode, type);
+        } catch (Exception e) {
+            logger.error(e.getMessage() + " Resource path: {}", resourceFullName, e);
+            putMsg(result, Status.RESOURCE_NOT_EXIST);
+            throw new ServiceException(String.format(e.getMessage() + " Resource path: %s", resourceFullName));

Review Comment:
   ```suggestion
               throw new ServiceException(String.format ("Get file status fail, resource path: %s", resourceFullName));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org