You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/10/26 11:38:18 UTC

[GitHub] [dolphinscheduler] jieguangzhou opened a new pull request, #12552: [Feature][Task] Transfer files between tasks #12479

jieguangzhou opened a new pull request, #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552

   <!--Thanks very much for contributing to Apache DolphinScheduler. Please review https://dolphinscheduler.apache.org/en-us/community/development/pull-request.html before opening a pull request.-->
   
   ## Purpose of the pull request
   
   ### FILE Parameter
   
   Use the file parameter to pass files (or folders, hereinafter referred to as **files**) in the working directory of the upstream task to the downstream task in the same workflow instance. The following scenarios may be used
   
   - In the ETL task, pass the data files processed by multiple upstream tasks to a specific downstream task.
   - In the machine learning scenario, pass the data set file of the upstream data preparation task to the downstream model training task.
   
   #### Usage
   
   ##### Configure file parameter
   
   File parameter configuration method: click the plus sign on the right side of "Custom Parameters" on the task definition page to configure.
   
   ### Output file to downstream task
   
   **Four options of custom parameters are:**
   
   - Parameter name: the identifier used when passing tasks, such as `KEY1` and `KEY2` in the figure below
   - Direction: OUT, which means outputting the file to the downstream task
   - Parameter type: FILE, indicating file parameter
   - Parameter value: output file path, such as `data` and `data/test2/text.txt` in the figure below
   
   The configuration in the figure below indicates that the `output` task passes two file data to the downstream task, respectively:
   
   - Pass out the folder `data`, and mark it as `dir-data`. The downstream task can get this folder through `output.dir-data`
   - Pass out the file `data/test2/text.txt`, and mark it as `file-text`. The downstream task can get this folder through `output.file-text`
   
   
   ![](https://raw.githubusercontent.com/apache/dolphinscheduler/d721497f1ac7c13b16b79dc7e9c6a8fc6fbb9980/docs/img/new_ui/dev/parameter/file_parameter_output.png)
   
   
   ##### Get the file from the upstream task
   
   **Four options of custom parameters are:**
   
   - Parameter name: the position where the upstream file is saved after input, such as `input_dir` used in the figure below
   - Direction: IN, which means to get the file from the upstream task
   - Parameter type: FILE, indicating file parameter
   - Parameter value: the identifier of the upstream file, in the format of `taskName.KEY`. For example, `output.dir-data` in the figure below, where `output` is the name of the upstream task, and `dir-data` is the file identifier output by the upstream task
   
   The configuration in the figure below indicates that the task gets the folder identified by `dir-data` from the upstream task `output` and saves it as `input_dir`
   
   
   ![](https://raw.githubusercontent.com/apache/dolphinscheduler/d721497f1ac7c13b16b79dc7e9c6a8fc6fbb9980/docs/img/new_ui/dev/parameter/file_parameter_input_dir.png)
   
   
   The configuration in the figure below indicates that the task gets the file identified by `file-text` from the upstream task `output` and saves it as `input.txt`
   
   
   ![](https://raw.githubusercontent.com/apache/dolphinscheduler/d721497f1ac7c13b16b79dc7e9c6a8fc6fbb9980/docs/img/new_ui/dev/parameter/file_parameter_input_file.png)
   
   
   close: #12479
   
   ## Brief change log
   
   <!--*(for example:)*
   - *Add maven-checkstyle-plugin to root pom.xml*
   -->
   
   ## Verify this pull request
   
   <!--*(Please pick either of the following options)*-->
   
   This pull request is code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   <!--*(example:)*
   - *Added dolphinscheduler-dao tests for end-to-end.*
   - *Added CronUtilsTest to verify the change.*
   - *Manually verified the change by testing locally.* -->
   
   (or)
   
   If your pull request contain incompatible change, you should also add it to `docs/docs/en/guide/upgrede/incompatible.md`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1006353575


##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java:
##########
@@ -216,15 +217,17 @@ protected void beforeExecute() {
         TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
         logger.info("Resources:{} check success", taskExecutionContext.getResources());
 
+        TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);

Review Comment:
   Good idea, but I think this should be implemented in A in the resource center interface because the resource center can connect HDFS, S3, and OSS. . Maybe we can create a new issue to do this, a unified interface can be provided later. WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1292908211

   I have marked this pr as a draft because I want to add a clean mechanism to clear data in the resource center.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1299781739

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [13 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![78.6%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '78.6%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [78.6% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.1%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.1%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.1% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1299920976

   @caishunfeng @Tianqi-Dotes  PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou merged pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou merged PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1293023910

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [14 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![33.3%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/25-16px.png '33.3%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [33.3% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1301593256

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [13 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![78.6%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '78.6%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [78.6% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.1%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.1%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.1% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1011243391


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -1871,6 +1874,61 @@ public Resource queryResourcesFileInfo(String userName, String fileName) {
         return (Resource) resourceResponse.getData();
     }
 
+    @Override
+    public Map<String, Object> deleteDataTransferData(User loginUser, Integer days) {

Review Comment:
   > I have changed it to `Result<Object>`, the same as other methods.
   
   The `Result<Object>` is also not clearly too, because it still do like 
   ```
   Map<String, Object> data = new HashMap<>();
   data.put("successList", successList);
   data.put("failList", failList);
   putMsg(result, Status.SUCCESS);
   result.setData(data);
   ```
   please see https://github.com/apache/dolphinscheduler/blob/9e0c9af1a5070b6a55750737aff2026233b26952/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java#L229
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1300056081

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [13 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![78.6%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '78.6%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [78.6% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.1%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.1%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.1% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] github-code-scanning[bot] commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1007134335


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java:
##########
@@ -270,6 +270,26 @@
         return resourceService.delete(loginUser, fullName, tenantCode);
     }
 
+    /**
+     * delete DATA_TRANSFER data
+     *
+     * @param loginUser login user
+     * @return delete result code
+     */
+    @Operation(summary = "deleteDataTransferData", description = "Delete the N days ago data of DATA_TRANSFER ")
+    @Parameters({
+            @Parameter(name = "days", description = "N days ago", required = true, schema = @Schema(implementation = String.class, example = "test/"))
+    })
+    @DeleteMapping(value = "/data-transfer-delete")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(DELETE_RESOURCE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public Map<String, Object> deleteDataTransferData(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                      @RequestParam(value = "days") Integer days,
+                                                      @RequestParam(value = "tenantCode", required = false) String tenantCode) throws Exception {

Review Comment:
   ## Useless parameter
   
   The parameter 'tenantCode' is never used.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2144)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1294312334

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.7%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.7%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.7% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.2%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.2%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.2% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] Radeity commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
Radeity commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1005758921


##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java:
##########
@@ -216,15 +217,17 @@ protected void beforeExecute() {
         TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
         logger.info("Resources:{} check success", taskExecutionContext.getResources());
 
+        TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);

Review Comment:
   I think it's not elegant enough to simply download zip file before executing and upload file after executing, cuz intermediate data maybe large-scale. Slice and buffer write will be better and also bring some benefits for prefetching for downstream task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1301592352

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [13 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![78.6%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '78.6%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [78.6% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.1%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.1%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.1% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1292052320

   > The pressure bandwidth should be show in the documentation
   
   I had add related message into documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1293879354

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [17 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.7%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.7%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.7% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.5% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] github-code-scanning[bot] commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1006412971


##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate       storageOperate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(StorageOperate storageOperate,
+                                         TaskExecutionContext taskExecutionContext) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param storageOperate       storage operate
+     * @param taskExecutionContext taskExecutionContext
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param direct,               Direct, may be Direct.IN or Direct.OUT.
+     * @return List<Property>
+     */
+    public static List<Property> getFileLocalParams(TaskExecutionContext taskExecutionContext, Direct direct) {
+        List<Property> localParamsProperty = new ArrayList<>();
+        JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams());
+        for (JsonNode localParam : taskParams.get("localParams")) {
+            Property property = JSONUtils.parseObject(localParam.toString(), Property.class);
+
+            if (property.getDirect().equals(direct) & property.getType().equals(DataType.FILE)) {

Review Comment:
   ## Dangerous non-short-circuit logic
   
   Possibly dangerous use of non-short circuit logic.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2131)



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate       storageOperate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(StorageOperate storageOperate,
+                                         TaskExecutionContext taskExecutionContext) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param storageOperate       storage operate
+     * @param taskExecutionContext taskExecutionContext
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param direct,               Direct, may be Direct.IN or Direct.OUT.

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "direct," does not match any actual parameter of method "getFileLocalParams()".
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2125)



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate       storageOperate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(StorageOperate storageOperate,
+                                         TaskExecutionContext taskExecutionContext) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param storageOperate       storage operate
+     * @param taskExecutionContext taskExecutionContext
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext, TaskExecutionContext

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "taskExecutionContext," does not match any actual parameter of method "getFileLocalParams()".
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2124)



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate       storageOperate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(StorageOperate storageOperate,
+                                         TaskExecutionContext taskExecutionContext) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param storageOperate       storage operate
+     * @param taskExecutionContext taskExecutionContext
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param direct,               Direct, may be Direct.IN or Direct.OUT.
+     * @return List<Property>
+     */
+    public static List<Property> getFileLocalParams(TaskExecutionContext taskExecutionContext, Direct direct) {
+        List<Property> localParamsProperty = new ArrayList<>();
+        JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams());
+        for (JsonNode localParam : taskParams.get("localParams")) {
+            Property property = JSONUtils.parseObject(localParam.toString(), Property.class);
+
+            if (property.getDirect().equals(direct) & property.getType().equals(DataType.FILE)) {
+                localParamsProperty.add(property);
+            }
+        }
+        return localParamsProperty;
+    }
+
+    /**
+     * get Resource path for manage files in storage
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param fileName,             String, file name
+     * @return resource path, RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
+     */
+    public static String getResourcePath(TaskExecutionContext taskExecutionContext, String fileName) {
+        String date =
+                DateUtils.formatTimeStamp(taskExecutionContext.getEndTime(), DateTimeFormatter.ofPattern("yyyyMMdd"));
+        // get resource Folder: RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID
+        String resourceFolder =
+                String.format("%s/%s/%d/%d_%d", RESOURCE_TAG, date, taskExecutionContext.getProcessDefineCode(),
+                        taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId());
+        // get resource fileL: resourceFolder/TaskName_TaskInstanceID_FileName
+        return String.format("%s/%s_%s_%s", resourceFolder, taskExecutionContext.getTaskName().replace(" ", "_"),
+                taskExecutionContext.getTaskInstanceId(), fileName);
+    }
+
+    /**
+     * get varPool from taskExecutionContext
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @return List<Property>
+     */
+    public static List<Property> getVarPools(TaskExecutionContext taskExecutionContext) {
+        List<Property> varPools = new ArrayList<>();
+
+        // get varPool
+        String varPoolString = taskExecutionContext.getVarPool();
+        if (StringUtils.isEmpty(varPoolString)) {
+            return varPools;
+        }
+        // parse varPool
+        for (JsonNode varPoolData : JSONUtils.parseArray(varPoolString)) {
+            Property property = JSONUtils.parseObject(varPoolData.toString(), Property.class);
+            varPools.add(property);
+        }
+        return varPools;
+    }
+
+    /**
+     * If the path is a directory, pack it and return the path of the package
+     *
+     * @param path, input path, may be a file or a directory

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "path," does not match any actual parameter of method "packIfDir()".
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2129)



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate       storageOperate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(StorageOperate storageOperate,
+                                         TaskExecutionContext taskExecutionContext) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param storageOperate       storage operate
+     * @param taskExecutionContext taskExecutionContext
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param direct,               Direct, may be Direct.IN or Direct.OUT.
+     * @return List<Property>
+     */
+    public static List<Property> getFileLocalParams(TaskExecutionContext taskExecutionContext, Direct direct) {
+        List<Property> localParamsProperty = new ArrayList<>();
+        JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams());
+        for (JsonNode localParam : taskParams.get("localParams")) {
+            Property property = JSONUtils.parseObject(localParam.toString(), Property.class);
+
+            if (property.getDirect().equals(direct) & property.getType().equals(DataType.FILE)) {
+                localParamsProperty.add(property);
+            }
+        }
+        return localParamsProperty;
+    }
+
+    /**
+     * get Resource path for manage files in storage
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param fileName,             String, file name
+     * @return resource path, RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
+     */
+    public static String getResourcePath(TaskExecutionContext taskExecutionContext, String fileName) {
+        String date =
+                DateUtils.formatTimeStamp(taskExecutionContext.getEndTime(), DateTimeFormatter.ofPattern("yyyyMMdd"));
+        // get resource Folder: RESOURCE_TAG/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID
+        String resourceFolder =
+                String.format("%s/%s/%d/%d_%d", RESOURCE_TAG, date, taskExecutionContext.getProcessDefineCode(),
+                        taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId());
+        // get resource fileL: resourceFolder/TaskName_TaskInstanceID_FileName
+        return String.format("%s/%s_%s_%s", resourceFolder, taskExecutionContext.getTaskName().replace(" ", "_"),
+                taskExecutionContext.getTaskInstanceId(), fileName);
+    }
+
+    /**
+     * get varPool from taskExecutionContext
+     *
+     * @param taskExecutionContext, TaskExecutionContext

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "taskExecutionContext," does not match any actual parameter of method "getVarPools()".
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2128)



##########
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.curator.shaded.com.google.common.io.Files;
+
+import java.io.File;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TaskFilesTransferUtilsTest {
+
+    @Test
+    public void testGetFileLocalParams() {
+        String taskParmas = "{\"localParams\":[" +
+                "{\"prop\":\"inputFile\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.data\"}," +
+                "{\"prop\":\"outputFile\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"data\"}," +
+                "{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"a\"}," +
+                "{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" +
+                "]}";
+        TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+        Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParmas);
+
+        List<Property> fileLocalParamsIn = TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.IN);
+        Assertions.assertEquals(1, fileLocalParamsIn.size());
+        Assertions.assertEquals("inputFile", fileLocalParamsIn.get(0).getProp());
+        Assertions.assertEquals("task1.data", fileLocalParamsIn.get(0).getValue());
+
+        List<Property> fileLocalParamsOut = TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT);
+        Assertions.assertEquals(1, fileLocalParamsOut.size());
+        Assertions.assertEquals("outputFile", fileLocalParamsOut.get(0).getProp());
+        Assertions.assertEquals("data", fileLocalParamsOut.get(0).getValue());
+
+    }
+
+    @Test
+    public void testGetResourcePath() {
+        String fileName = "test.txt";
+        TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+
+        long endTime = System.currentTimeMillis();
+        String date = DateUtils.formatTimeStamp(endTime, DateTimeFormatter.ofPattern("yyyyMMdd"));
+        Mockito.when(taskExecutionContext.getEndTime()).thenReturn(endTime);
+
+        Long processDefineCode = 123L;

Review Comment:
   ## Boxed variable is never null
   
   The variable 'processDefineCode' is only assigned values of primitive type and is never 'null', but it is declared with the boxed type 'Long'.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2130)



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate       storageOperate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(StorageOperate storageOperate,
+                                         TaskExecutionContext taskExecutionContext) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param storageOperate       storage operate
+     * @param taskExecutionContext taskExecutionContext
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param direct,               Direct, may be Direct.IN or Direct.OUT.
+     * @return List<Property>
+     */
+    public static List<Property> getFileLocalParams(TaskExecutionContext taskExecutionContext, Direct direct) {
+        List<Property> localParamsProperty = new ArrayList<>();
+        JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams());
+        for (JsonNode localParam : taskParams.get("localParams")) {
+            Property property = JSONUtils.parseObject(localParam.toString(), Property.class);
+
+            if (property.getDirect().equals(direct) & property.getType().equals(DataType.FILE)) {
+                localParamsProperty.add(property);
+            }
+        }
+        return localParamsProperty;
+    }
+
+    /**
+     * get Resource path for manage files in storage
+     *
+     * @param taskExecutionContext, TaskExecutionContext

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "taskExecutionContext," does not match any actual parameter of method "getResourcePath()".
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2126)



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate       storageOperate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(StorageOperate storageOperate,
+                                         TaskExecutionContext taskExecutionContext) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param storageOperate       storage operate
+     * @param taskExecutionContext taskExecutionContext
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param direct,               Direct, may be Direct.IN or Direct.OUT.
+     * @return List<Property>
+     */
+    public static List<Property> getFileLocalParams(TaskExecutionContext taskExecutionContext, Direct direct) {
+        List<Property> localParamsProperty = new ArrayList<>();
+        JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams());
+        for (JsonNode localParam : taskParams.get("localParams")) {
+            Property property = JSONUtils.parseObject(localParam.toString(), Property.class);
+
+            if (property.getDirect().equals(direct) & property.getType().equals(DataType.FILE)) {
+                localParamsProperty.add(property);
+            }
+        }
+        return localParamsProperty;
+    }
+
+    /**
+     * get Resource path for manage files in storage
+     *
+     * @param taskExecutionContext, TaskExecutionContext
+     * @param fileName,             String, file name

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "fileName," does not match any actual parameter of method "getResourcePath()".
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2127)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1011117609


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -1871,6 +1874,61 @@ public Resource queryResourcesFileInfo(String userName, String fileName) {
         return (Resource) resourceResponse.getData();
     }
 
+    @Override
+    public Map<String, Object> deleteDataTransferData(User loginUser, Integer days) {

Review Comment:
   I have changed it to `Result<Object>`, the same as other methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhongjiajie commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
zhongjiajie commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1012447851


##########
docs/docs/en/guide/parameter/file-parameter.md:
##########
@@ -0,0 +1,102 @@
+# FILE Parameter

Review Comment:
   Need to add the new document to `https://github.com/apache/dolphinscheduler/blob/dev/docs/configs/docsdev.js`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1293534530

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [15 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![83.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '83.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [83.5% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1293534468

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [15 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![83.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '83.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [83.5% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1294313045

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.7%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.7%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.7% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.2%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.2%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.2% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1010240224


##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    private TaskFilesTransferUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext is the context of task
+     * @param storageOperate       is the storage operate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
+                                         StorageOperate storageOperate) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param taskExecutionContext is the context of task
+     * @param storageOperate       is the storage operate
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error("{} not in  {}", property.getValue(), varPoolsMap.keySet());
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", downloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);

Review Comment:
   same here.



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    private TaskFilesTransferUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext is the context of task
+     * @param storageOperate       is the storage operate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
+                                         StorageOperate storageOperate) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);

Review Comment:
   ```suggestion
                   throw new TaskException("Upload file to storage error", ex);
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java:
##########
@@ -270,6 +270,25 @@ public Result<Object> deleteResource(@Parameter(hidden = true) @RequestAttribute
         return resourceService.delete(loginUser, fullName, tenantCode);
     }
 
+    /**
+     * delete DATA_TRANSFER data
+     *
+     * @param loginUser login user
+     * @return delete result code
+     */
+    @Operation(summary = "deleteDataTransferData", description = "Delete the N days ago data of DATA_TRANSFER ")
+    @Parameters({
+            @Parameter(name = "days", description = "N days ago", required = true, schema = @Schema(implementation = Integer.class))
+    })
+    @DeleteMapping(value = "/data-transfer-delete")

Review Comment:
   Don't need delete action due to the request method is `DeleteMapping`.
   ```suggestion
       @DeleteMapping(value = "/data-transfer")
   ```



##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -1871,6 +1874,61 @@ public Resource queryResourcesFileInfo(String userName, String fileName) {
         return (Resource) resourceResponse.getData();
     }
 
+    @Override
+    public Map<String, Object> deleteDataTransferData(User loginUser, Integer days) {

Review Comment:
   We should avoid to use `Map<String, Object>` as result object, because it's not clearly.
   ```suggestion
       public DeleteDataTransferResponse deleteDataTransferData(User loginUser, Integer days) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1293024584

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [14 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![33.3%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/25-16px.png '33.3%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [33.3% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![0.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1295275374

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.4%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.4%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.4% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.2%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.2%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.2% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1295280547

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.4%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.4%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.4% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.2%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.2%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.2% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] Tianqi-Dotes commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
Tianqi-Dotes commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1010236311


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -1871,6 +1874,61 @@ public Resource queryResourcesFileInfo(String userName, String fileName) {
         return (Resource) resourceResponse.getData();
     }
 
+    @Override
+    public Map<String, Object> deleteDataTransferData(User loginUser, Integer days) {
+        Map<String, Object> result = new HashMap<>();
+
+        User user = userMapper.selectById(loginUser.getId());
+        if (user == null) {
+            logger.error("user {} not exists", loginUser.getId());
+            putMsg(result, Status.USER_NOT_EXIST, loginUser.getId());
+            return result;
+        }
+
+        Tenant tenant = tenantMapper.queryById(user.getTenantId());
+        if (tenant == null) {
+            logger.error("tenant not exists");
+            putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
+            return result;
+        }
+        String tenantCode = tenant.getTenantCode();
+
+        String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER");
+
+        LocalDateTime now = LocalDateTime.now();
+        now = now.minus(days, ChronoUnit.DAYS);
+        String deleteDate = now.toLocalDate().toString().replace("-", "");
+        List<StorageEntity> storageEntities;
+        try {
+            storageEntities = new ArrayList<>(
+                    storageOperate.listFilesStatus(baseFolder, baseFolder, tenantCode, ResourceType.FILE));
+        } catch (Exception e) {
+            logger.error("delete data transfer data error", e);
+            putMsg(result, Status.DELETE_RESOURCE_ERROR);
+            return result;
+        }
+
+        List<String> successList = new ArrayList<>();
+        List<String> failList = new ArrayList<>();
+        for (StorageEntity storageEntity : storageEntities) {

Review Comment:
   consider transaction



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1300051070

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [13 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![78.6%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '78.6%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [78.6% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.1%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.1%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.1% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] Radeity commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
Radeity commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1006416205


##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java:
##########
@@ -216,15 +217,17 @@ protected void beforeExecute() {
         TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
         logger.info("Resources:{} check success", taskExecutionContext.getResources());
 
+        TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);

Review Comment:
   Sound good, maybe we can design it together, some mechanism in distributed filesystem are worth referring to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1299655002

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![59.8%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '59.8%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [59.8% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.5% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] codecov-commenter commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1293873989

   # [Codecov](https://codecov.io/gh/apache/dolphinscheduler/pull/12552?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#12552](https://codecov.io/gh/apache/dolphinscheduler/pull/12552?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7f87349) into [dev](https://codecov.io/gh/apache/dolphinscheduler/commit/547aa437ab424250dac6b4df3de7a4b1b8af7d98?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (547aa43) will **increase** coverage by `0.21%`.
   > The diff coverage is `58.99%`.
   
   > :exclamation: Current head 7f87349 differs from pull request most recent head b27f0a1. Consider uploading reports for the commit b27f0a1 to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##                dev   #12552      +/-   ##
   ============================================
   + Coverage     38.94%   39.15%   +0.21%     
   - Complexity     4159     4210      +51     
   ============================================
     Files          1044     1045       +1     
     Lines         39292    39591     +299     
     Branches       4501     4544      +43     
   ============================================
   + Hits          15301    15503     +202     
   - Misses        22250    22333      +83     
   - Partials       1741     1755      +14     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/dolphinscheduler/pull/12552?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...hinscheduler/api/controller/FavTaskController.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL2NvbnRyb2xsZXIvRmF2VGFza0NvbnRyb2xsZXIuamF2YQ==) | `14.28% <0.00%> (ø)` | |
   | [...hinscheduler/api/controller/ProjectController.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL2NvbnRyb2xsZXIvUHJvamVjdENvbnRyb2xsZXIuamF2YQ==) | `50.00% <0.00%> (-18.43%)` | :arrow_down: |
   | [...nscheduler/api/controller/ResourcesController.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL2NvbnRyb2xsZXIvUmVzb3VyY2VzQ29udHJvbGxlci5qYXZh) | `54.71% <0.00%> (-1.06%)` | :arrow_down: |
   | [...lphinscheduler/api/controller/UsersController.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL2NvbnRyb2xsZXIvVXNlcnNDb250cm9sbGVyLmphdmE=) | `55.00% <0.00%> (-3.93%)` | :arrow_down: |
   | [...rg/apache/dolphinscheduler/api/dto/FavTaskDto.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL2R0by9GYXZUYXNrRHRvLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...permission/ResourcePermissionCheckServiceImpl.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL3Blcm1pc3Npb24vUmVzb3VyY2VQZXJtaXNzaW9uQ2hlY2tTZXJ2aWNlSW1wbC5qYXZh) | `66.92% <0.00%> (ø)` | |
   | [...scheduler/api/service/impl/FavTaskServiceImpl.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL3NlcnZpY2UvaW1wbC9GYXZUYXNrU2VydmljZUltcGwuamF2YQ==) | `5.88% <0.00%> (ø)` | |
   | [...api/service/impl/ProcessDefinitionServiceImpl.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL3NlcnZpY2UvaW1wbC9Qcm9jZXNzRGVmaW5pdGlvblNlcnZpY2VJbXBsLmphdmE=) | `34.74% <0.00%> (-0.12%)` | :arrow_down: |
   | [...heduler/api/service/impl/ResourcesServiceImpl.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvYXBpL3NlcnZpY2UvaW1wbC9SZXNvdXJjZXNTZXJ2aWNlSW1wbC5qYXZh) | `36.28% <0.00%> (-1.41%)` | :arrow_down: |
   | [...nscheduler/service/process/ProcessServiceImpl.java](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9kb2xwaGluc2NoZWR1bGVyL3NlcnZpY2UvcHJvY2Vzcy9Qcm9jZXNzU2VydmljZUltcGwuamF2YQ==) | `29.44% <ø> (+0.29%)` | :arrow_up: |
   | ... and [14 more](https://codecov.io/gh/apache/dolphinscheduler/pull/12552/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] Radeity commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
Radeity commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1008200036


##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    private TaskFilesTransferUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext is the context of task
+     * @param storageOperate       is the storage operate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
+                                         StorageOperate storageOperate) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                logger.info("123123123123");

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1299797424

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [13 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![78.6%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '78.6%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [78.6% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.1%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.1%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.1% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1012450032


##########
docs/docs/en/guide/parameter/file-parameter.md:
##########
@@ -0,0 +1,102 @@
+# FILE Parameter

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] zhongjiajie commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
zhongjiajie commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1292023757

   The pressure bandwidth should be show in the documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] github-code-scanning[bot] commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1006799664


##########
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.curator.shaded.com.google.common.io.Files;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.zeroturnaround.zip.ZipUtil;
+
+public class TaskFilesTransferUtilsTest {
+
+    final private long processDefineCode = 123;
+    final private int processDefineVersion = 456;
+    final private int processInstanceId = 678;
+    final private int taskInstanceId = 789;
+    final private String taskName = "test";
+
+    final private String tenantCode = "ubuntu";
+
+    private String date;
+
+    private long endTime;
+
+    private String exceptTemplate;
+
+    @BeforeEach
+    public void Tas() {
+        endTime = System.currentTimeMillis();
+        date = DateUtils.formatTimeStamp(endTime, DateTimeFormatter.ofPattern("yyyyMMdd"));
+        exceptTemplate = String.format("%s/%s/%d/%d_%d/%s_%d",
+                TaskFilesTransferUtils.RESOURCE_TAG,
+                date,
+                processDefineCode,
+                processDefineVersion,
+                processInstanceId,
+                taskName,
+                taskInstanceId);
+    }
+
+    @Test
+    public void testUploadOutputFiles() throws IOException {
+        File executePath = Files.createTempDir();
+        File folderPath = new File(executePath, "data");
+        folderPath.mkdirs();
+        File file = new File(folderPath.getPath() + "/test.txt");
+        file.createNewFile();

Review Comment:
   ## Ignored error status of call
   
   Method testUploadOutputFiles ignores exceptional return value of File.createNewFile.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2133)



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.service.storage.StorageOperate;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskFilesTransferUtils {
+
+    protected final static Logger logger = LoggerFactory
+            .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class));
+
+    // root path in resource storage
+    final static String RESOURCE_TAG = "DATA_TRANSFER";
+
+    // tmp path in local path for transfer
+    final static String DOWNLOAD_TMP = ".DT_TMP";
+
+    // suffix of the package file
+    final static String PACK_SUFFIX = "_ds_pack.zip";
+
+    /**
+     * upload output files to resource storage
+     *
+     * @param taskExecutionContext is the context of task
+     * @param storageOperate       is the storage operate
+     * @throws TaskException TaskException
+     */
+    public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
+                                         StorageOperate storageOperate) throws TaskException {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get OUTPUT FILE parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        logger.info("Upload output files ...");
+        for (Property property : localParamsProperty) {
+            // get local file path
+            String srcPath =
+                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            // get remote file path
+            String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            try {
+                // upload file to storage
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
+                logger.info("123123123123");
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // update varPool
+            Property oriProperty;
+            // if the property is not in varPool, add it
+            if (varPoolsMap.containsKey(property.getProp())) {
+                oriProperty = varPoolsMap.get(property.getProp());
+            } else {
+                oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
+                varPools.add(oriProperty);
+            }
+            oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
+            oriProperty.setValue(resourcePath);
+        }
+        taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
+    }
+
+    /**
+     * download upstream files from storage
+     * only download files which are defined in the task parameters
+     *
+     * @param taskExecutionContext is the context of task
+     * @param storageOperate       is the storage operate
+     * @throws TaskException task exception
+     */
+    public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
+        List<Property> varPools = getVarPools(taskExecutionContext);
+        // get map of varPools for quick search
+        Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
+
+        // get "IN FILE" parameters
+        List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
+
+        if (localParamsProperty.isEmpty()) {
+            return;
+        }
+
+        String executePath = taskExecutionContext.getExecutePath();
+        // data path to download packaged data
+        String DownloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);
+
+        logger.info("Download upstream files...");
+        for (Property property : localParamsProperty) {
+            Property inVarPool = varPoolsMap.get(property.getValue());
+            if (inVarPool == null) {
+                logger.error(String.format("%s not in  %s", property.getValue(), varPoolsMap.keySet()));
+                throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+                        property.getValue()));
+            }
+
+            String resourcePath = inVarPool.getValue();
+            String targetPath = String.format("%s/%s", executePath, property.getProp());
+
+            String downloadPath;
+            // If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
+            // targetPath
+            boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
+            if (isPack) {
+                downloadPath = String.format("%s/%s", DownloadTmpPath, new File(resourcePath).getName());
+            } else {
+                downloadPath = targetPath;
+            }
+
+            try {
+                String resourceWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                logger.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
+                storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
+                        true);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
+            // unpack if the data is packaged
+            if (isPack) {
+                File downloadFile = new File(downloadPath);
+                logger.info("Unpack {} to {}", downloadPath, targetPath);
+                ZipUtil.unpack(downloadFile, new File(targetPath));
+            }
+        }
+
+        // delete DownloadTmp Folder if DownloadTmpPath exists
+        try {
+            org.apache.commons.io.FileUtils.deleteDirectory(new File(DownloadTmpPath));
+        } catch (IOException e) {
+            logger.error(
+                    "Delete DownloadTmpPath {} failed, this will not affect the task status", DownloadTmpPath, e);
+        }
+    }
+
+    /**
+     * get local parameters property which type is FILE and direction is equal to direct
+     *
+     * @param taskExecutionContext is the context of task
+     * @param direct,              may be Direct.IN or Direct.OUT.

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "direct," does not match any actual parameter of method "getFileLocalParams()".
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/2132)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1007534859


##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java:
##########
@@ -216,15 +217,17 @@ protected void beforeExecute() {
         TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
         logger.info("Resources:{} check success", taskExecutionContext.getResources());
 
+        TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);

Review Comment:
   I have created a new issue about that #12578



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1293880087

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [17 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.7%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.7%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.7% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.5% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1297861591

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.4%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.4%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.4% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.2%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.2%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.2% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1297857477

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![60.4%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/60-16px.png '60.4%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [60.4% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.2%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.2%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.2% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#issuecomment-1299652726

   SonarCloud Quality Gate failed.&nbsp; &nbsp; [![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=12552)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL) [7 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=12552&resolved=false&types=CODE_SMELL)
   
   [![59.8%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '59.8%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list) [59.8% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_coverage&view=list)  
   [![2.5%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.5%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list) [2.5% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=12552&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #12552: [Feature][Task] Transfer files between tasks

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #12552:
URL: https://github.com/apache/dolphinscheduler/pull/12552#discussion_r1011328547


##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java:
##########
@@ -1871,6 +1874,61 @@ public Resource queryResourcesFileInfo(String userName, String fileName) {
         return (Resource) resourceResponse.getData();
     }
 
+    @Override
+    public Map<String, Object> deleteDataTransferData(User loginUser, Integer days) {

Review Comment:
   I have added a new class `DeleteDataTransferResponse` to handle the method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org