You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/09/26 01:42:40 UTC

[GitHub] [dolphinscheduler] jieguangzhou opened a new pull request, #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

jieguangzhou opened a new pull request, #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868

   <!--Thanks very much for contributing to Apache DolphinScheduler. Please review https://dolphinscheduler.apache.org/en-us/community/development/pull-request.html before opening a pull request.-->
   
   
   ## Purpose of the pull request
   
   close: #11672
   
   ## Brief change log
   
   - [x] Create and start DMS task by interface
   - [x] Create and start DMS task by json data
   - [x] Restart DMS task by interface
   - [x] Restart DMS task by json data
   
   **Create and start DMS task by interface**
   
   <img width="607" alt="image" src="https://user-images.githubusercontent.com/31528124/189254325-66f66bd6-28bd-4a99-8e6e-3bf26589119e.png">
   
   **Create and start DMS task by json data**
   
   
   <img width="571" alt="image" src="https://user-images.githubusercontent.com/31528124/189254351-3a24d278-f5b3-4c12-899e-ce5f4f8a5166.png">
   
   **Restart DMS task by interface**
   
   <img width="579" alt="image" src="https://user-images.githubusercontent.com/31528124/189254368-5a2b7afa-9575-445a-8b0e-2dce702ff4a4.png">
   
   
   **Restart DMS task by json data**
   
   <img width="578" alt="image" src="https://user-images.githubusercontent.com/31528124/189254389-70f0a129-f468-4774-9167-af7f11a68c80.png">
   
   ## Verify this pull request
   
   <!--*(Please pick either of the following options)*-->
   
   This pull request is code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   <!--*(example:)*
     - *Added dolphinscheduler-dao tests for end-to-end.*
     - *Added CronUtilsTest to verify the change.*
     - *Manually verified the change by testing locally.* -->
   
   (or)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1260772179

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![54.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '54.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [54.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1245244662

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![59.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '59.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [59.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou merged pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou merged PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1261882296

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![54.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '54.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [54.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1242153898

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![59.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '59.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [59.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1260363398

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![54.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '54.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [54.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1260362909

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![54.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '54.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [54.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1261897331

   @Amy0104 PTAL, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] EricGao888 commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r983128037


##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.

Review Comment:
   Why there is a link to `pytorch` here? Maybe we could remove it.



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use JSON data
+
+DolphinScheduler will track the status of the DMS task and set the status to successfully completed when the DMS task is completed. Except for the CDC task without end time.
+
+So, if the `migrationType` is `cdc` or `full-load-and-cdc`, `cdcStopPosition` not be set, DolphinScheduler will set the status to successfully after the DMS task starts successfully.
+
+## Create Task
+
+- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag <img src="../../../../img/tasks/icons/dms.png" width="15"/> from the toolbar to the canvas.
+
+## Task Example
+
+The task plugin picture is as follows
+
+**Create and start DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start.png)
+
+
+**Restart DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_restart.png)
+
+
+**Create and start DMS task by JSON data**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start_json.png)
+
+**Restart DMS task by JSON data**
+
+![dms](../../../../img/tasks/demo/dms_restart_json.png)
+
+
+
+### First, introduce some general parameters of DolphinScheduler

Review Comment:
   We have put those default parameters in a separate doc for better maintainability. See: https://github.com/apache/dolphinscheduler/pull/11776



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1257385702

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [23 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![56.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '56.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [56.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r980688264


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DeleteReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.Filter;
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.ResourceNotFoundException;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.StopReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+import com.amazonaws.services.databasemigrationservice.model.TestConnectionRequest;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+public class DmsHook {
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+    private AWSDatabaseMigrationService client;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    public DmsHook() {
+        this.client = createClient();
+    }
+
+    public static AWSDatabaseMigrationService createClient() {
+        final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
+
+        // create a DMS client
+        return AWSDatabaseMigrationServiceClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+
+    public Boolean createReplicationTask() throws Exception {
+        logger.info("createReplicationTask ......");
+        CreateReplicationTaskRequest request = new CreateReplicationTaskRequest()
+            .withReplicationTaskIdentifier(replicationTaskIdentifier)
+            .withSourceEndpointArn(sourceEndpointArn)
+            .withTargetEndpointArn(targetEndpointArn)
+            .withReplicationInstanceArn(replicationInstanceArn)
+            .withMigrationType(migrationType)
+            .withTableMappings(tableMappings)
+            .withReplicationTaskSettings(replicationTaskSettings)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition)
+            .withTags(tags)
+            .withTaskData(taskData)
+            .withResourceIdentifier(resourceIdentifier);
+
+        request.setTableMappings(replaceFileParameters(request.getTableMappings()));
+        request.setReplicationTaskSettings(replaceFileParameters(request.getReplicationTaskSettings()));
+
+        CreateReplicationTaskResult result = client.createReplicationTask(request);
+        replicationTaskIdentifier = result.getReplicationTask().getReplicationTaskIdentifier();
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        logger.info("replicationTaskIdentifier: {}, replicationTaskArn: {}", replicationTaskIdentifier, replicationTaskArn);
+        return awaitReplicationTaskStatus(STATUS.READY);
+    }
+
+
+    public Boolean startReplicationTask() {
+        logger.info("startReplicationTask ......");
+        StartReplicationTaskRequest request = new StartReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn)
+            .withStartReplicationTaskType(startReplicationTaskType)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition);
+        StartReplicationTaskResult result = client.startReplicationTask(request);
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        return awaitReplicationTaskStatus(STATUS.RUNNING);
+    }
+
+    public Boolean checkFinishedReplicationTask() {
+        logger.info("checkFinishedReplicationTask ......");
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+        String stopReason = describeReplicationTasks().getStopReason();
+        return stopReason.endsWith(STATUS.FINISH_END_TOKEN);
+    }
+
+    public void stopReplicationTask() {
+        logger.info("stopReplicationTask ......");
+        if (replicationTaskArn == null) {
+            return;
+        }
+        StopReplicationTaskRequest request = new StopReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.stopReplicationTask(request);
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+    }
+
+    public Boolean deleteReplicationTask() {
+        logger.info("deleteReplicationTask ......");
+        DeleteReplicationTaskRequest request = new DeleteReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.deleteReplicationTask(request);
+        Boolean isDeleteSuccessfully;
+        try {
+            isDeleteSuccessfully = awaitReplicationTaskStatus(STATUS.DELETE);
+        } catch (ResourceNotFoundException e) {
+            isDeleteSuccessfully = true;
+        }
+        return isDeleteSuccessfully;
+    }
+
+    public Boolean testConnectionEndpoint() {
+        return (testConnection(replicationInstanceArn, sourceEndpointArn) && testConnection(replicationInstanceArn, targetEndpointArn));
+    }
+
+    public Boolean testConnection(String replicationInstanceArn, String endpointArn) {
+        logger.info("Test connect replication instance: {} and endpoint: {}", replicationInstanceArn, endpointArn);
+        TestConnectionRequest request = new TestConnectionRequest().
+            withReplicationInstanceArn(replicationInstanceArn)
+            .withEndpointArn(endpointArn);
+        try {
+            client.testConnection(request);
+        } catch (InvalidResourceStateException e) {
+            logger.info(e.getErrorMessage());
+        }
+
+        return awaitConnectSuccess(replicationInstanceArn, endpointArn);
+    }
+
+    public Boolean awaitConnectSuccess(String replicationInstanceArn, String endpointArn) {
+        Filter instanceFilters = new Filter().withName(AWS_KEY.REPLICATION_INSTANCE_ARN).withValues(replicationInstanceArn);
+        Filter endpointFilters = new Filter().withName(AWS_KEY.ENDPOINT_ARN).withValues(endpointArn);
+        DescribeConnectionsRequest request = new DescribeConnectionsRequest().withFilters(endpointFilters, instanceFilters)
+            .withMarker("");
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            DescribeConnectionsResult response = client.describeConnections(request);
+            String status = response.getConnections().get(0).getStatus();
+            if (status.equals(STATUS.SUCCESSFUL)) {
+                logger.info("Connect successful");
+                return true;
+            } else if (!status.equals(STATUS.TESTING)) {
+                break;
+            }
+        }
+        logger.info("Connect error");
+        return false;
+    }
+
+    public ReplicationTask describeReplicationTasks() {
+        Filter replicationTaskFilter = new Filter().withName(AWS_KEY.REPLICATION_TASK_ARN).withValues(replicationTaskArn);
+        DescribeReplicationTasksRequest request = new DescribeReplicationTasksRequest().withFilters(replicationTaskFilter).withMaxRecords(20).withMarker("");
+        DescribeReplicationTasksResult result = client.describeReplicationTasks(request);
+        ReplicationTask replicationTask = result.getReplicationTasks().get(0);
+
+        if (sourceEndpointArn == null) {
+            sourceEndpointArn = replicationTask.getSourceEndpointArn();
+        }
+
+        if (targetEndpointArn == null) {
+            targetEndpointArn = replicationTask.getTargetEndpointArn();
+        }
+
+        if (replicationInstanceArn == null) {
+            replicationInstanceArn = replicationTask.getReplicationInstanceArn();
+        }
+
+        if (replicationTaskArn == null) {
+            replicationTaskArn = replicationTask.getReplicationTaskArn();
+        }
+
+        return replicationTask;
+    }
+
+    public Boolean awaitReplicationTaskStatus(String exceptStatus, String... stopStatus) {
+        List<String> stopStatusSet = Arrays.asList(stopStatus);
+        Integer lastPercent = 0;
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            ReplicationTask replicationTask = describeReplicationTasks();
+            String status = replicationTask.getStatus();
+
+            if (status.equals(STATUS.RUNNING) || status.equals(STATUS.STOPPED)) {
+                ReplicationTaskStats taskStats = replicationTask.getReplicationTaskStats();
+                Integer percent;
+                if (taskStats != null) {
+                    percent = taskStats.getFullLoadProgressPercent();
+                } else {
+                    percent = 0;
+                }
+                if (!lastPercent.equals(percent)) {
+                    String runningMessage = String.format("fullLoadProgressPercent: %s ", percent);
+                    logger.info(runningMessage);
+                }
+                lastPercent = percent;
+            }
+
+            if (exceptStatus.equals(status)) {
+                logger.info("success");
+                return true;
+            } else if (stopStatusSet.contains(status)) {
+                break;
+            }
+        }
+        logger.info("error");

Review Comment:
   I think It is necessary to log messages about the DMS task connect status for every step. Especially when it's wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] Tianqi-Dotes commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
Tianqi-Dotes commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r982020166


##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data
+
+DolphinScheduler will track the status of the DMS task and set the status to successfully completed when the DMS task is completed. Except for the CDC task without end time.
+
+So, if the `migrationType` is `cdc` or `full-load-and-cdc`, `cdcStopPosition` not be set, DolphinScheduler will set the status to successfully after the DMS task start successfully.

Review Comment:
   after the DMS task starts successfully.



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data

Review Comment:
   JSON



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");

Review Comment:
   sagemaker



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");
+        }
+    }
+
+    public int checkCreateReplicationTask() throws TaskException {
+
+        // if IsRestartTask, return success, do not create replication task
+        if (parameters.getIsRestartTask()) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        }
+
+        // if not IsRestartTask, create replication task
+        Boolean isCreateSuccessfully;
+        try {
+            isCreateSuccessfully = dmsHook.createReplicationTask();
+        } catch (Exception e) {
+            throw new TaskException("DMS task create replication task error", e);
+        }
+
+        // if create replication task successfully, return EXIT_CODE_SUCCESS, else return EXIT_CODE_FAILURE
+        if (isCreateSuccessfully) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+    }
+
+    /**
+     * start replication task
+     *
+     * @return
+     * @throws TaskException
+     */
+    public int startReplicationTask() {
+
+        Boolean isStartSuccessfully = false;
+        try {
+            isStartSuccessfully = dmsHook.startReplicationTask();
+        } catch (InvalidResourceStateException e) {
+            logger.error("Failed to start a task, error message: {}", e.getErrorMessage());
+
+            // Only restart task when the error contains "Test connection", means instance can not connect to source or target
+            if (!e.getErrorMessage().contains("Test connection")) {
+                return TaskConstants.EXIT_CODE_FAILURE;
+            }
+
+            logger.info("restart replication task");
+            // if only restart task, run dmsHook.describeReplicationTasks to get replication task arn
+            if (parameters.getIsRestartTask()) {
+                dmsHook.describeReplicationTasks();
+            }
+
+            // test connection endpoint again and restart task if connection is ok
+            if (dmsHook.testConnectionEndpoint()) {
+                isStartSuccessfully = dmsHook.startReplicationTask();
+            }
+        }
+
+        // if start replication task failed, return EXIT_CODE_FAILURE
+        if (!isStartSuccessfully) {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+
+        return TaskConstants.EXIT_CODE_SUCCESS;
+    }
+
+    /**
+     * check if stop task when cdc
+     *
+     * @return true if stop task when cdc type and cdcStopPosition is not set, else return false
+     */
+    public Boolean isStopTaskWhenCdc() {
+        ReplicationTask replicationTask = dmsHook.describeReplicationTasks();
+        String migrationType = replicationTask.getMigrationType();
+        return migrationType.contains("cdc") && parameters.getCdcStopPosition() == null;
+    }
+
+    /**
+     * init dms hook
+     */
+    public void initDmsHook() throws TaskException {
+        convertJsonParameters();
+
+        dmsHook = new DmsHook();
+        try {
+            BeanUtils.copyProperties(dmsHook, parameters);
+        } catch (Exception e) {
+            throw new TaskException("DMS task init error", e);
+        }
+

Review Comment:
   format



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data
+
+DolphinScheduler will track the status of the DMS task and set the status to successfully completed when the DMS task is completed. Except for the CDC task without end time.
+
+So, if the `migrationType` is `cdc` or `full-load-and-cdc`, `cdcStopPosition` not be set, DolphinScheduler will set the status to successfully after the DMS task start successfully.
+
+## Create Task
+
+- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag <img src="../../../../img/tasks/icons/dms.png" width="15"/> from the toolbar to the canvas.
+
+## Task Example
+
+The task plugin picture is as follows
+
+**Create and start DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start.png)
+
+
+**Restart DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_restart.png)
+
+
+**Create and start DMS task by json data**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start_json.png)
+
+**Restart DMS task by json data**
+
+![dms](../../../../img/tasks/demo/dms_restart_json.png)
+
+
+
+### First, introduce some general parameters of DolphinScheduler
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select
+  the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high
+  to low, and tasks with the same priority will execute in a first-in first-out order.

Review Comment:
   first-in-first-out



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use json data

Review Comment:
   and the following json



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");

Review Comment:
   this else haven't start task yet



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {

Review Comment:
   format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1260444384

   @caishunfeng @EricGao888 PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1256257149

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [23 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![56.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '56.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [56.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r980687014


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTaskTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.support.membermodification.MemberModifier;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    PropertyUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsTaskTest {
+
+    @Mock
+    DmsHook dmsHook;
+
+    DmsTask dmsTask;
+
+    @Before
+    public void before() throws Exception {
+        whenNew(DmsHook.class).withAnyArguments().thenReturn(dmsHook);
+        DmsParameters dmsParameters = new DmsParameters();
+        dmsTask = initTask(dmsParameters);
+        dmsTask.initDmsHook();
+        MemberModifier.field(DmsTask.class, "dmsHook").set(dmsTask, dmsHook);
+    }
+
+    @Test
+    public void testCreateTaskJson() {

Review Comment:
   This case will make sure that the parameter can read value correctly from JSON format data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1260502727

   > PTAL
   
   Done, PTAL, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r983141931


##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.

Review Comment:
   done



##########
docs/docs/en/guide/task/dms.md:
##########
@@ -0,0 +1,107 @@
+# DMS Node
+
+## Overview
+
+[Pytorch](https://pytorch.org) is a mainstream Python machine learning library.
+
+[AWS Database Migration Service (AWS DMS)](https://aws.amazon.com/cn/dms) helps you migrate databases to AWS quickly and securely. 
+The source database remains fully operational during the migration, minimizing downtime to applications that rely on the database. 
+The AWS Database Migration Service can migrate your data to and from the most widely used commercial and open-source databases.
+
+DMS task plugin can help users to create and start DMS tasks in DolphinScheduler more conveniently.
+
+Contains two features:
+- Create DMS task and start DMS task
+- Restart DMS task
+
+We can create DMS task and start DMS task in two ways:
+- Use interface
+- Use JSON data
+
+DolphinScheduler will track the status of the DMS task and set the status to successfully completed when the DMS task is completed. Except for the CDC task without end time.
+
+So, if the `migrationType` is `cdc` or `full-load-and-cdc`, `cdcStopPosition` not be set, DolphinScheduler will set the status to successfully after the DMS task starts successfully.
+
+## Create Task
+
+- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag <img src="../../../../img/tasks/icons/dms.png" width="15"/> from the toolbar to the canvas.
+
+## Task Example
+
+The task plugin picture is as follows
+
+**Create and start DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start.png)
+
+
+**Restart DMS task by interface**
+
+![dms](../../../../img/tasks/demo/dms_restart.png)
+
+
+**Create and start DMS task by JSON data**
+
+![dms](../../../../img/tasks/demo/dms_create_and_start_json.png)
+
+**Restart DMS task by JSON data**
+
+![dms](../../../../img/tasks/demo/dms_restart_json.png)
+
+
+
+### First, introduce some general parameters of DolphinScheduler

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r980996665


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] codecov-commenter commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1242146077

   # [Codecov](https://codecov.io/gh/apache/dolphinscheduler/pull/11868?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#11868](https://codecov.io/gh/apache/dolphinscheduler/pull/11868?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8351dda) into [dev](https://codecov.io/gh/apache/dolphinscheduler/commit/3664d85143a319e3319cf9bd09108e87492c75bf?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3664d85) will **increase** coverage by `0.10%`.
   > The diff coverage is `60.25%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##                dev   #11868      +/-   ##
   ============================================
   + Coverage     38.55%   38.65%   +0.10%     
   - Complexity     4037     4066      +29     
   ============================================
     Files           993      998       +5     
     Lines         36698    36904     +206     
     Branches       4273     4295      +22     
   ============================================
   + Hits          14148    14267     +119     
   - Misses        20923    21003      +80     
   - Partials       1627     1634       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/dolphinscheduler/pull/11868?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...olphinscheduler/plugin/task/dms/DmsParameters.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci10YXNrLXBsdWdpbi9kb2xwaGluc2NoZWR1bGVyLXRhc2stZG1zL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9kb2xwaGluc2NoZWR1bGVyL3BsdWdpbi90YXNrL2Rtcy9EbXNQYXJhbWV0ZXJzLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...lphinscheduler/plugin/task/dms/DmsTaskChannel.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci10YXNrLXBsdWdpbi9kb2xwaGluc2NoZWR1bGVyLXRhc2stZG1zL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9kb2xwaGluc2NoZWR1bGVyL3BsdWdpbi90YXNrL2Rtcy9EbXNUYXNrQ2hhbm5lbC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...heduler/plugin/task/dms/DmsTaskChannelFactory.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci10YXNrLXBsdWdpbi9kb2xwaGluc2NoZWR1bGVyLXRhc2stZG1zL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9kb2xwaGluc2NoZWR1bGVyL3BsdWdpbi90YXNrL2Rtcy9EbXNUYXNrQ2hhbm5lbEZhY3RvcnkuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ache/dolphinscheduler/plugin/task/dms/DmsTask.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci10YXNrLXBsdWdpbi9kb2xwaGluc2NoZWR1bGVyLXRhc2stZG1zL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9kb2xwaGluc2NoZWR1bGVyL3BsdWdpbi90YXNrL2Rtcy9EbXNUYXNrLmphdmE=) | `59.25% <59.25%> (ø)` | |
   | [...ache/dolphinscheduler/plugin/task/dms/DmsHook.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci10YXNrLXBsdWdpbi9kb2xwaGluc2NoZWR1bGVyLXRhc2stZG1zL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9kb2xwaGluc2NoZWR1bGVyL3BsdWdpbi90YXNrL2Rtcy9EbXNIb29rLmphdmE=) | `67.39% <67.39%> (ø)` | |
   | [...eduler/server/worker/task/WorkerHeartBeatTask.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci13b3JrZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvc2VydmVyL3dvcmtlci90YXNrL1dvcmtlckhlYXJ0QmVhdFRhc2suamF2YQ==) | `68.08% <0.00%> (-8.52%)` | :arrow_down: |
   | [...org/apache/dolphinscheduler/remote/utils/Host.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1yZW1vdGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvcmVtb3RlL3V0aWxzL0hvc3QuamF2YQ==) | `42.55% <0.00%> (-2.13%)` | :arrow_down: |
   | [...dolphinscheduler/remote/future/ResponseFuture.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1yZW1vdGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvcmVtb3RlL2Z1dHVyZS9SZXNwb25zZUZ1dHVyZS5qYXZh) | `81.96% <0.00%> (-1.64%)` | :arrow_down: |
   | [...e/dolphinscheduler/remote/NettyRemotingClient.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1yZW1vdGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvcmVtb3RlL05ldHR5UmVtb3RpbmdDbGllbnQuamF2YQ==) | `51.38% <0.00%> (-1.39%)` | :arrow_down: |
   | [...che/dolphinscheduler/common/utils/HadoopUtils.java](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZG9scGhpbnNjaGVkdWxlci1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2RvbHBoaW5zY2hlZHVsZXIvY29tbW9uL3V0aWxzL0hhZG9vcFV0aWxzLmphdmE=) | `15.94% <0.00%> (-0.73%)` | :arrow_down: |
   | ... and [15 more](https://codecov.io/gh/apache/dolphinscheduler/pull/11868/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] caishunfeng commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1256253848

   Hi @jieguangzhou is this pr ready for review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou closed pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou closed pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)
URL: https://github.com/apache/dolphinscheduler/pull/11868


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1256246173

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [23 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![56.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '56.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [56.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r978709244


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DeleteReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.Filter;
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.ResourceNotFoundException;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.StopReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+import com.amazonaws.services.databasemigrationservice.model.TestConnectionRequest;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+public class DmsHook {
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+    private AWSDatabaseMigrationService client;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    public DmsHook() {
+        this.client = createClient();
+    }
+
+    public static AWSDatabaseMigrationService createClient() {
+        final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
+
+        // create a DMS client
+        return AWSDatabaseMigrationServiceClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+
+    public Boolean createReplicationTask() throws Exception {
+        logger.info("createReplicationTask ......");
+        CreateReplicationTaskRequest request = new CreateReplicationTaskRequest()
+            .withReplicationTaskIdentifier(replicationTaskIdentifier)
+            .withSourceEndpointArn(sourceEndpointArn)
+            .withTargetEndpointArn(targetEndpointArn)
+            .withReplicationInstanceArn(replicationInstanceArn)
+            .withMigrationType(migrationType)
+            .withTableMappings(tableMappings)
+            .withReplicationTaskSettings(replicationTaskSettings)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition)
+            .withTags(tags)
+            .withTaskData(taskData)
+            .withResourceIdentifier(resourceIdentifier);
+
+        request.setTableMappings(replaceFileParameters(request.getTableMappings()));
+        request.setReplicationTaskSettings(replaceFileParameters(request.getReplicationTaskSettings()));
+
+        CreateReplicationTaskResult result = client.createReplicationTask(request);
+        replicationTaskIdentifier = result.getReplicationTask().getReplicationTaskIdentifier();
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        logger.info("replicationTaskIdentifier: {}, replicationTaskArn: {}", replicationTaskIdentifier, replicationTaskArn);
+        return awaitReplicationTaskStatus(STATUS.READY);
+    }
+
+
+    public Boolean startReplicationTask() {
+        logger.info("startReplicationTask ......");
+        StartReplicationTaskRequest request = new StartReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn)
+            .withStartReplicationTaskType(startReplicationTaskType)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition);
+        StartReplicationTaskResult result = client.startReplicationTask(request);
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        return awaitReplicationTaskStatus(STATUS.RUNNING);
+    }
+
+    public Boolean checkFinishedReplicationTask() {
+        logger.info("checkFinishedReplicationTask ......");
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+        String stopReason = describeReplicationTasks().getStopReason();
+        return stopReason.endsWith(STATUS.FINISH_END_TOKEN);
+    }
+
+    public void stopReplicationTask() {
+        logger.info("stopReplicationTask ......");
+        if (replicationTaskArn == null) {
+            return;
+        }
+        StopReplicationTaskRequest request = new StopReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.stopReplicationTask(request);
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+    }
+
+    public Boolean deleteReplicationTask() {
+        logger.info("deleteReplicationTask ......");
+        DeleteReplicationTaskRequest request = new DeleteReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.deleteReplicationTask(request);
+        Boolean isDeleteSuccessfully;
+        try {
+            isDeleteSuccessfully = awaitReplicationTaskStatus(STATUS.DELETE);
+        } catch (ResourceNotFoundException e) {
+            isDeleteSuccessfully = true;
+        }
+        return isDeleteSuccessfully;
+    }
+
+    public Boolean testConnectionEndpoint() {
+        return (testConnection(replicationInstanceArn, sourceEndpointArn) && testConnection(replicationInstanceArn, targetEndpointArn));
+    }
+
+    public Boolean testConnection(String replicationInstanceArn, String endpointArn) {
+        logger.info("Test connect replication instance: {} and endpoint: {}", replicationInstanceArn, endpointArn);
+        TestConnectionRequest request = new TestConnectionRequest().
+            withReplicationInstanceArn(replicationInstanceArn)
+            .withEndpointArn(endpointArn);
+        try {
+            client.testConnection(request);
+        } catch (InvalidResourceStateException e) {
+            logger.info(e.getErrorMessage());
+        }
+
+        return awaitConnectSuccess(replicationInstanceArn, endpointArn);
+    }
+
+    public Boolean awaitConnectSuccess(String replicationInstanceArn, String endpointArn) {
+        Filter instanceFilters = new Filter().withName(AWS_KEY.REPLICATION_INSTANCE_ARN).withValues(replicationInstanceArn);
+        Filter endpointFilters = new Filter().withName(AWS_KEY.ENDPOINT_ARN).withValues(endpointArn);
+        DescribeConnectionsRequest request = new DescribeConnectionsRequest().withFilters(endpointFilters, instanceFilters)
+            .withMarker("");
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            DescribeConnectionsResult response = client.describeConnections(request);
+            String status = response.getConnections().get(0).getStatus();
+            if (status.equals(STATUS.SUCCESSFUL)) {
+                logger.info("Connect successful");
+                return true;
+            } else if (!status.equals(STATUS.TESTING)) {
+                break;
+            }
+        }
+        logger.info("Connect error");
+        return false;
+    }
+
+    public ReplicationTask describeReplicationTasks() {
+        Filter replicationTaskFilter = new Filter().withName(AWS_KEY.REPLICATION_TASK_ARN).withValues(replicationTaskArn);
+        DescribeReplicationTasksRequest request = new DescribeReplicationTasksRequest().withFilters(replicationTaskFilter).withMaxRecords(20).withMarker("");
+        DescribeReplicationTasksResult result = client.describeReplicationTasks(request);
+        ReplicationTask replicationTask = result.getReplicationTasks().get(0);
+
+        if (sourceEndpointArn == null) {
+            sourceEndpointArn = replicationTask.getSourceEndpointArn();
+        }
+
+        if (targetEndpointArn == null) {
+            targetEndpointArn = replicationTask.getTargetEndpointArn();
+        }
+
+        if (replicationInstanceArn == null) {
+            replicationInstanceArn = replicationTask.getReplicationInstanceArn();
+        }
+
+        if (replicationTaskArn == null) {
+            replicationTaskArn = replicationTask.getReplicationTaskArn();
+        }
+
+        return replicationTask;
+    }
+
+    public Boolean awaitReplicationTaskStatus(String exceptStatus, String... stopStatus) {
+        List<String> stopStatusSet = Arrays.asList(stopStatus);
+        Integer lastPercent = 0;
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            ReplicationTask replicationTask = describeReplicationTasks();
+            String status = replicationTask.getStatus();
+
+            if (status.equals(STATUS.RUNNING) || status.equals(STATUS.STOPPED)) {
+                ReplicationTaskStats taskStats = replicationTask.getReplicationTaskStats();
+                Integer percent;
+                if (taskStats != null) {
+                    percent = taskStats.getFullLoadProgressPercent();
+                } else {
+                    percent = 0;
+                }
+                if (!lastPercent.equals(percent)) {
+                    String runningMessage = String.format("fullLoadProgressPercent: %s ", percent);
+                    logger.info(runningMessage);
+                }
+                lastPercent = percent;
+            }
+
+            if (exceptStatus.equals(status)) {
+                logger.info("success");

Review Comment:
   remove it if useless.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/pom.xml:
##########
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-task-plugin</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>dev-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-task-dms</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-spi</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-task-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-dms</artifactId>
+            <version>1.12.297</version>
+        </dependency>

Review Comment:
   Move the version to bom.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenAnswer(invocation -> client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");

Review Comment:
   use constant string `arn:aws:dms:ap-southeast-1:123456789012:task:task`



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");
+        }
+    }
+
+    public int checkCreateReplicationTask() throws TaskException {
+
+        // if IsRestartTask, return success, do not create replication task
+        if (parameters.getIsRestartTask()) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        }
+
+        // if not IsRestartTask, create replication task
+        Boolean isCreateSuccessfully;
+        try {
+            isCreateSuccessfully = dmsHook.createReplicationTask();
+        } catch (Exception e) {
+            throw new TaskException("DMS task create replication task error", e);
+        }
+
+        // if create replication task successfully, return EXIT_CODE_SUCCESS, else return EXIT_CODE_FAILURE
+        if (isCreateSuccessfully) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+    }
+
+    /**
+     * start replication task
+     *
+     * @return
+     * @throws TaskException
+     */
+    public int startReplicationTask() {
+
+        Boolean isStartSuccessfully = false;
+        try {
+            isStartSuccessfully = dmsHook.startReplicationTask();
+        } catch (InvalidResourceStateException e) {
+            logger.error("Failed to start a task, error message: {}", e.getErrorMessage());
+
+            // Only restart task when the error contains "Test connection", means instance can not connect to source or target
+            if (!e.getErrorMessage().contains("Test connection")) {
+                return TaskConstants.EXIT_CODE_FAILURE;
+            }
+
+            logger.info("restart replication task");
+            // if only restart task, run dmsHook.describeReplicationTasks to get replication task arn
+            if (parameters.getIsRestartTask()) {
+                dmsHook.describeReplicationTasks();
+            }
+
+            // test connection endpoint again and restart task if connection is ok
+            if (dmsHook.testConnectionEndpoint()) {
+                isStartSuccessfully = dmsHook.startReplicationTask();
+            }
+        }
+
+        // if start replication task failed, return EXIT_CODE_FAILURE
+        if (!isStartSuccessfully) {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+
+        return TaskConstants.EXIT_CODE_SUCCESS;
+    }
+
+    /**
+     * check if stop task when cdc
+     *
+     * @return true if stop task when cdc type and cdcStopPosition is not set, else return false
+     */
+    public Boolean isStopTaskWhenCdc() {
+        ReplicationTask replicationTask = dmsHook.describeReplicationTasks();
+        String migrationType = replicationTask.getMigrationType();
+        return migrationType.contains("cdc") && parameters.getCdcStopPosition() == null;
+    }
+
+    /**
+     * init dms hook
+     */
+    public void initDmsHook() throws TaskException {
+        convertJsonParameters();
+
+        dmsHook = new DmsHook();
+        try {
+            BeanUtils.copyProperties(dmsHook, parameters);
+        } catch (Exception e) {
+            throw new TaskException("DMS task init error", e);
+        }
+
+
+        if (!StringUtils.isNotEmpty(parameters.getStartReplicationTaskType())) {
+            if (parameters.getIsRestartTask()) {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.RELOAD_TARGET);
+            } else {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.START_REPLICATION);
+            }
+        }
+    }
+
+    /**
+     * convert json parameters to dms parameters
+     */
+    public void convertJsonParameters() throws TaskException {
+        // create a new parameter object using the json data if the json data is not empty
+        if (parameters.getIsJsonFormat() && parameters.getJsonData() != null) {
+            // combining local and global parameters
+            String jsonData = ParameterUtils.convertParameterPlaceholders(parameters.getJsonData(), ParamUtils.convert(taskExecutionContext.getPrepareParamsMap()));
+
+            boolean isRestartTask = parameters.getIsRestartTask();
+            try {
+                parameters = objectMapper.readValue(jsonData, DmsParameters.class);
+                parameters.setIsRestartTask(isRestartTask);
+            } catch (Exception e) {
+                logger.error("Failed to convert json data to DmsParameters object, error message: {}", e.getMessage());

Review Comment:
   ```suggestion
                   logger.error("Failed to convert json data to DmsParameters object", e);
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsParameters.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import java.util.Date;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+
+import lombok.Data;
+
+@Data
+public class DmsParameters extends AbstractParameters {
+
+    private Boolean isRestartTask = false;
+    private Boolean isJsonFormat = false;
+    private String jsonData;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    @Override
+    public boolean checkParameters() {
+        boolean flag;
+        if (isJsonFormat) {
+            flag = jsonData != null;
+        } else if (isRestartTask) {
+            flag = (replicationTaskArn != null);

Review Comment:
   ```suggestion
               flag = replicationTaskArn != null;
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTaskTest.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.support.membermodification.MemberModifier;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    PropertyUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsTaskTest {
+
+    @Mock
+    DmsHook dmsHook;
+
+    DmsTask dmsTask;
+
+    @Before
+    public void before() throws Exception {
+        whenNew(DmsHook.class).withAnyArguments().thenReturn(dmsHook);
+        DmsParameters dmsParameters = new DmsParameters();
+        dmsTask = initTask(dmsParameters);
+        dmsTask.initDmsHook();
+        MemberModifier.field(DmsTask.class, "dmsHook").set(dmsTask, dmsHook);
+    }
+
+    @Test
+    public void testCreateTaskJson() {

Review Comment:
   It seems a meanless test case.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+            .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
+    /**
+     * taskExecutionContext
+     */
+    private final TaskExecutionContext taskExecutionContext;
+    public DmsHook dmsHook;
+    /**
+     * Dms parameters
+     */
+    private DmsParameters parameters;
+    private DmsHook.ApplicationIds appId;
+
+    public DmsTask(TaskExecutionContext taskExecutionContext) {
+        super(taskExecutionContext);
+        this.taskExecutionContext = taskExecutionContext;
+
+    }
+
+    @Override
+    public void init() throws TaskException {
+        logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
+        parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
+        initDmsHook();
+    }
+
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void submitApplication() throws TaskException {
+        exitStatusCode = checkCreateReplicationTask();
+        if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
+            exitStatusCode = startReplicationTask();
+        } else {
+            throw new TaskException("DMS task failed to start");
+        }
+
+        // if the task is not running, the task will be deleted
+        if (exitStatusCode == TaskConstants.EXIT_CODE_FAILURE && !parameters.getIsRestartTask()) {
+            dmsHook.deleteReplicationTask();
+        }else {
+            appId = dmsHook.getApplicationIds();
+            setAppIds(JSONUtils.toJsonString(appId));
+        }
+    }
+
+    @Override
+    public void trackApplicationStatus() {
+        initAppId();
+        dmsHook.setReplicationTaskArn(appId.getReplicationTaskArn());
+        // if CdcStopPosition is not set, the task will not continue to check the running status
+        if (isStopTaskWhenCdc()) {
+            logger.info("This is a cdc task and cdcStopPosition is not set, the task will not continue to check the running status");
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+            return;
+        }
+
+        Boolean isFinishedSuccessfully = dmsHook.checkFinishedReplicationTask();
+        if (isFinishedSuccessfully) {
+            exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            throw new TaskException("DMS task failed to track");
+        }
+    }
+
+    /**
+     * init DMS remote AppId if null
+     */
+    private void initAppId() {
+        if (appId == null) {
+            if (StringUtils.isNotEmpty(getAppIds())) {
+                appId = JSONUtils.parseObject(getAppIds(), DmsHook.ApplicationIds.class);
+            }
+        }
+        if (appId == null) {
+            throw new TaskException("sagemaker applicationID is null");
+        }
+    }
+
+    public int checkCreateReplicationTask() throws TaskException {
+
+        // if IsRestartTask, return success, do not create replication task
+        if (parameters.getIsRestartTask()) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        }
+
+        // if not IsRestartTask, create replication task
+        Boolean isCreateSuccessfully;
+        try {
+            isCreateSuccessfully = dmsHook.createReplicationTask();
+        } catch (Exception e) {
+            throw new TaskException("DMS task create replication task error", e);
+        }
+
+        // if create replication task successfully, return EXIT_CODE_SUCCESS, else return EXIT_CODE_FAILURE
+        if (isCreateSuccessfully) {
+            return TaskConstants.EXIT_CODE_SUCCESS;
+        } else {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+    }
+
+    /**
+     * start replication task
+     *
+     * @return
+     * @throws TaskException
+     */
+    public int startReplicationTask() {
+
+        Boolean isStartSuccessfully = false;
+        try {
+            isStartSuccessfully = dmsHook.startReplicationTask();
+        } catch (InvalidResourceStateException e) {
+            logger.error("Failed to start a task, error message: {}", e.getErrorMessage());
+
+            // Only restart task when the error contains "Test connection", means instance can not connect to source or target
+            if (!e.getErrorMessage().contains("Test connection")) {
+                return TaskConstants.EXIT_CODE_FAILURE;
+            }
+
+            logger.info("restart replication task");
+            // if only restart task, run dmsHook.describeReplicationTasks to get replication task arn
+            if (parameters.getIsRestartTask()) {
+                dmsHook.describeReplicationTasks();
+            }
+
+            // test connection endpoint again and restart task if connection is ok
+            if (dmsHook.testConnectionEndpoint()) {
+                isStartSuccessfully = dmsHook.startReplicationTask();
+            }
+        }
+
+        // if start replication task failed, return EXIT_CODE_FAILURE
+        if (!isStartSuccessfully) {
+            return TaskConstants.EXIT_CODE_FAILURE;
+        }
+
+        return TaskConstants.EXIT_CODE_SUCCESS;
+    }
+
+    /**
+     * check if stop task when cdc
+     *
+     * @return true if stop task when cdc type and cdcStopPosition is not set, else return false
+     */
+    public Boolean isStopTaskWhenCdc() {
+        ReplicationTask replicationTask = dmsHook.describeReplicationTasks();
+        String migrationType = replicationTask.getMigrationType();
+        return migrationType.contains("cdc") && parameters.getCdcStopPosition() == null;
+    }
+
+    /**
+     * init dms hook
+     */
+    public void initDmsHook() throws TaskException {
+        convertJsonParameters();
+
+        dmsHook = new DmsHook();
+        try {
+            BeanUtils.copyProperties(dmsHook, parameters);
+        } catch (Exception e) {
+            throw new TaskException("DMS task init error", e);
+        }
+
+
+        if (!StringUtils.isNotEmpty(parameters.getStartReplicationTaskType())) {
+            if (parameters.getIsRestartTask()) {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.RELOAD_TARGET);
+            } else {
+                dmsHook.setStartReplicationTaskType(DmsHook.START_TYPE.START_REPLICATION);
+            }
+        }
+    }
+
+    /**
+     * convert json parameters to dms parameters
+     */
+    public void convertJsonParameters() throws TaskException {
+        // create a new parameter object using the json data if the json data is not empty
+        if (parameters.getIsJsonFormat() && parameters.getJsonData() != null) {
+            // combining local and global parameters
+            String jsonData = ParameterUtils.convertParameterPlaceholders(parameters.getJsonData(), ParamUtils.convert(taskExecutionContext.getPrepareParamsMap()));
+
+            boolean isRestartTask = parameters.getIsRestartTask();
+            try {
+                parameters = objectMapper.readValue(jsonData, DmsParameters.class);
+                parameters.setIsRestartTask(isRestartTask);
+            } catch (Exception e) {
+                logger.error("Failed to convert json data to DmsParameters object, error message: {}", e.getMessage());
+                throw new TaskException(e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public DmsParameters getParameters() {
+        return parameters;
+    }
+
+    @Override
+    public void cancelApplication() {
+        dmsHook.stopReplicationTask();
+//        dmsHook.deleteReplicationTask();

Review Comment:
   ```suggestion
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DeleteReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.Filter;
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.ResourceNotFoundException;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.StopReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+import com.amazonaws.services.databasemigrationservice.model.TestConnectionRequest;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+public class DmsHook {
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+    private AWSDatabaseMigrationService client;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    public DmsHook() {
+        this.client = createClient();
+    }
+
+    public static AWSDatabaseMigrationService createClient() {
+        final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
+
+        // create a DMS client
+        return AWSDatabaseMigrationServiceClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+
+    public Boolean createReplicationTask() throws Exception {
+        logger.info("createReplicationTask ......");
+        CreateReplicationTaskRequest request = new CreateReplicationTaskRequest()
+            .withReplicationTaskIdentifier(replicationTaskIdentifier)
+            .withSourceEndpointArn(sourceEndpointArn)
+            .withTargetEndpointArn(targetEndpointArn)
+            .withReplicationInstanceArn(replicationInstanceArn)
+            .withMigrationType(migrationType)
+            .withTableMappings(tableMappings)
+            .withReplicationTaskSettings(replicationTaskSettings)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition)
+            .withTags(tags)
+            .withTaskData(taskData)
+            .withResourceIdentifier(resourceIdentifier);
+
+        request.setTableMappings(replaceFileParameters(request.getTableMappings()));
+        request.setReplicationTaskSettings(replaceFileParameters(request.getReplicationTaskSettings()));
+
+        CreateReplicationTaskResult result = client.createReplicationTask(request);
+        replicationTaskIdentifier = result.getReplicationTask().getReplicationTaskIdentifier();
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        logger.info("replicationTaskIdentifier: {}, replicationTaskArn: {}", replicationTaskIdentifier, replicationTaskArn);
+        return awaitReplicationTaskStatus(STATUS.READY);
+    }
+
+
+    public Boolean startReplicationTask() {
+        logger.info("startReplicationTask ......");
+        StartReplicationTaskRequest request = new StartReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn)
+            .withStartReplicationTaskType(startReplicationTaskType)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition);
+        StartReplicationTaskResult result = client.startReplicationTask(request);
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        return awaitReplicationTaskStatus(STATUS.RUNNING);
+    }
+
+    public Boolean checkFinishedReplicationTask() {
+        logger.info("checkFinishedReplicationTask ......");
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+        String stopReason = describeReplicationTasks().getStopReason();
+        return stopReason.endsWith(STATUS.FINISH_END_TOKEN);
+    }
+
+    public void stopReplicationTask() {
+        logger.info("stopReplicationTask ......");
+        if (replicationTaskArn == null) {
+            return;
+        }
+        StopReplicationTaskRequest request = new StopReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.stopReplicationTask(request);
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+    }
+
+    public Boolean deleteReplicationTask() {
+        logger.info("deleteReplicationTask ......");
+        DeleteReplicationTaskRequest request = new DeleteReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.deleteReplicationTask(request);
+        Boolean isDeleteSuccessfully;
+        try {
+            isDeleteSuccessfully = awaitReplicationTaskStatus(STATUS.DELETE);
+        } catch (ResourceNotFoundException e) {
+            isDeleteSuccessfully = true;
+        }
+        return isDeleteSuccessfully;
+    }
+
+    public Boolean testConnectionEndpoint() {
+        return (testConnection(replicationInstanceArn, sourceEndpointArn) && testConnection(replicationInstanceArn, targetEndpointArn));
+    }
+
+    public Boolean testConnection(String replicationInstanceArn, String endpointArn) {
+        logger.info("Test connect replication instance: {} and endpoint: {}", replicationInstanceArn, endpointArn);
+        TestConnectionRequest request = new TestConnectionRequest().
+            withReplicationInstanceArn(replicationInstanceArn)
+            .withEndpointArn(endpointArn);
+        try {
+            client.testConnection(request);
+        } catch (InvalidResourceStateException e) {
+            logger.info(e.getErrorMessage());
+        }
+
+        return awaitConnectSuccess(replicationInstanceArn, endpointArn);
+    }
+
+    public Boolean awaitConnectSuccess(String replicationInstanceArn, String endpointArn) {
+        Filter instanceFilters = new Filter().withName(AWS_KEY.REPLICATION_INSTANCE_ARN).withValues(replicationInstanceArn);
+        Filter endpointFilters = new Filter().withName(AWS_KEY.ENDPOINT_ARN).withValues(endpointArn);
+        DescribeConnectionsRequest request = new DescribeConnectionsRequest().withFilters(endpointFilters, instanceFilters)
+            .withMarker("");
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            DescribeConnectionsResult response = client.describeConnections(request);
+            String status = response.getConnections().get(0).getStatus();
+            if (status.equals(STATUS.SUCCESSFUL)) {
+                logger.info("Connect successful");
+                return true;
+            } else if (!status.equals(STATUS.TESTING)) {
+                break;
+            }
+        }
+        logger.info("Connect error");
+        return false;
+    }
+
+    public ReplicationTask describeReplicationTasks() {
+        Filter replicationTaskFilter = new Filter().withName(AWS_KEY.REPLICATION_TASK_ARN).withValues(replicationTaskArn);
+        DescribeReplicationTasksRequest request = new DescribeReplicationTasksRequest().withFilters(replicationTaskFilter).withMaxRecords(20).withMarker("");
+        DescribeReplicationTasksResult result = client.describeReplicationTasks(request);
+        ReplicationTask replicationTask = result.getReplicationTasks().get(0);
+
+        if (sourceEndpointArn == null) {
+            sourceEndpointArn = replicationTask.getSourceEndpointArn();
+        }
+
+        if (targetEndpointArn == null) {
+            targetEndpointArn = replicationTask.getTargetEndpointArn();
+        }
+
+        if (replicationInstanceArn == null) {
+            replicationInstanceArn = replicationTask.getReplicationInstanceArn();
+        }
+
+        if (replicationTaskArn == null) {
+            replicationTaskArn = replicationTask.getReplicationTaskArn();
+        }
+
+        return replicationTask;
+    }
+
+    public Boolean awaitReplicationTaskStatus(String exceptStatus, String... stopStatus) {
+        List<String> stopStatusSet = Arrays.asList(stopStatus);
+        Integer lastPercent = 0;
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            ReplicationTask replicationTask = describeReplicationTasks();
+            String status = replicationTask.getStatus();
+
+            if (status.equals(STATUS.RUNNING) || status.equals(STATUS.STOPPED)) {
+                ReplicationTaskStats taskStats = replicationTask.getReplicationTaskStats();
+                Integer percent;
+                if (taskStats != null) {
+                    percent = taskStats.getFullLoadProgressPercent();
+                } else {
+                    percent = 0;
+                }
+                if (!lastPercent.equals(percent)) {
+                    String runningMessage = String.format("fullLoadProgressPercent: %s ", percent);
+                    logger.info(runningMessage);
+                }
+                lastPercent = percent;
+            }
+
+            if (exceptStatus.equals(status)) {
+                logger.info("success");
+                return true;
+            } else if (stopStatusSet.contains(status)) {
+                break;
+            }
+        }
+        logger.info("error");

Review Comment:
   remove it if useless.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenAnswer(invocation -> client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.READY);
+        when(createReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.createReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+        Assert.assertEquals("task", dmsHook.getReplicationTaskIdentifier());
+    }
+
+    @Test(timeout = 60000)
+    public void testStartReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+        StartReplicationTaskResult startReplicationTaskResult = mock(StartReplicationTaskResult.class);
+        when(client.startReplicationTask(any())).thenReturn(startReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.RUNNING);
+        when(startReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.startReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckFinishedReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.STOPPED);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        when(replicationTask.getStopReason()).thenReturn("*_FINISHED");
+        Assert.assertTrue(dmsHook.checkFinishedReplicationTask());
+
+        when(replicationTask.getStopReason()).thenReturn("*_ERROR");
+        Assert.assertFalse(dmsHook.checkFinishedReplicationTask());
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.DELETE);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.deleteReplicationTask());
+
+    }
+
+    @Test
+    public void testTestConnectionEndpoint() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        String replicationInstanceArn = "replicationInstanceArn";
+        String trueSourceEndpointArn = "trueSourceEndpointArn";
+        String trueTargetEndpointArn = "trueTargetEndpointArn";
+        String falseSourceEndpointArn = "falseSourceEndpointArn";
+        String falseTargetEndpointArn = "falseTargetEndpointArn";
+
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueSourceEndpointArn);
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueTargetEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseSourceEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseTargetEndpointArn);
+
+
+        dmsHook.setReplicationInstanceArn(replicationInstanceArn);
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertTrue(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+    }
+
+
+    @Test
+    public void testDescribeReplicationTasks() {
+
+        DmsHook dmsHook = new DmsHook();
+        dmsHook.setReplicationInstanceArn("arn:aws:dms:ap-southeast-1:123456789012:task:task_exist");
+
+        DescribeReplicationTasksResult describeReplicationTasksResult = mock(DescribeReplicationTasksResult.class);
+        when(client.describeReplicationTasks(any())).thenReturn(describeReplicationTasksResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getSourceEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source");
+        when(replicationTask.getTargetEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target");
+
+        when(describeReplicationTasksResult.getReplicationTasks()).thenReturn(Arrays.asList(replicationTask));
+
+        ReplicationTask replicationTaskOut = dmsHook.describeReplicationTasks();
+        Assert.assertNotEquals(dmsHook.getReplicationInstanceArn(), replicationTaskOut.getReplicationTaskArn());
+        Assert.assertEquals("task", replicationTaskOut.getReplicationTaskIdentifier());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source", replicationTaskOut.getSourceEndpointArn());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target", replicationTaskOut.getTargetEndpointArn());
+
+    }
+
+
+    @Test(timeout = 60000)
+    public void testAwaitReplicationTaskStatus() {
+
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        ReplicationTaskStats taskStats = mock(ReplicationTaskStats.class);
+        when(replicationTask.getReplicationTaskStats()).thenReturn(taskStats);
+        when(taskStats.getFullLoadProgressPercent()).thenReturn(100);
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertFalse(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED, DmsHook.STATUS.RUNNING));
+    }
+
+    @Test
+    public void testReplaceFileParameters() throws IOException {
+        String path = this.getClass().getResource("table_mapping.json").getPath();
+
+        String jsonData = loadJson("table_mapping.json");
+
+        DmsHook dmsHook = new DmsHook();
+
+        String pathParameter = "file://" + path;
+        Assert.assertEquals(jsonData, dmsHook.replaceFileParameters(pathParameter));
+
+//        String pathParameter2 = "file://" + "not_exist.json";
+//
+//        try {
+//            Assert.assertEquals(pathParameter2, dmsHook.replaceFileParameters(pathParameter2));
+//        }catch (Exception e) {
+//            Assert.assertTrue(e instanceof IOException);
+//        }
+
+        String pathParameter3 = "{}";
+        Assert.assertEquals(pathParameter3, dmsHook.replaceFileParameters(pathParameter3));
+
+    }
+
+//    this.getClass().getResourceAsStream("SagemakerRequestJson.json"))

Review Comment:
   ```suggestion
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenAnswer(invocation -> client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.READY);
+        when(createReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.createReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+        Assert.assertEquals("task", dmsHook.getReplicationTaskIdentifier());
+    }
+
+    @Test(timeout = 60000)
+    public void testStartReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+        StartReplicationTaskResult startReplicationTaskResult = mock(StartReplicationTaskResult.class);
+        when(client.startReplicationTask(any())).thenReturn(startReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.RUNNING);
+        when(startReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.startReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckFinishedReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.STOPPED);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        when(replicationTask.getStopReason()).thenReturn("*_FINISHED");
+        Assert.assertTrue(dmsHook.checkFinishedReplicationTask());
+
+        when(replicationTask.getStopReason()).thenReturn("*_ERROR");
+        Assert.assertFalse(dmsHook.checkFinishedReplicationTask());
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.DELETE);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.deleteReplicationTask());
+
+    }
+
+    @Test
+    public void testTestConnectionEndpoint() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        String replicationInstanceArn = "replicationInstanceArn";
+        String trueSourceEndpointArn = "trueSourceEndpointArn";
+        String trueTargetEndpointArn = "trueTargetEndpointArn";
+        String falseSourceEndpointArn = "falseSourceEndpointArn";
+        String falseTargetEndpointArn = "falseTargetEndpointArn";
+
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueSourceEndpointArn);
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueTargetEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseSourceEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseTargetEndpointArn);
+
+
+        dmsHook.setReplicationInstanceArn(replicationInstanceArn);
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertTrue(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+    }
+
+
+    @Test
+    public void testDescribeReplicationTasks() {
+
+        DmsHook dmsHook = new DmsHook();
+        dmsHook.setReplicationInstanceArn("arn:aws:dms:ap-southeast-1:123456789012:task:task_exist");
+
+        DescribeReplicationTasksResult describeReplicationTasksResult = mock(DescribeReplicationTasksResult.class);
+        when(client.describeReplicationTasks(any())).thenReturn(describeReplicationTasksResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getSourceEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source");
+        when(replicationTask.getTargetEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target");
+
+        when(describeReplicationTasksResult.getReplicationTasks()).thenReturn(Arrays.asList(replicationTask));
+
+        ReplicationTask replicationTaskOut = dmsHook.describeReplicationTasks();
+        Assert.assertNotEquals(dmsHook.getReplicationInstanceArn(), replicationTaskOut.getReplicationTaskArn());
+        Assert.assertEquals("task", replicationTaskOut.getReplicationTaskIdentifier());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source", replicationTaskOut.getSourceEndpointArn());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target", replicationTaskOut.getTargetEndpointArn());
+
+    }
+
+
+    @Test(timeout = 60000)
+    public void testAwaitReplicationTaskStatus() {
+
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        ReplicationTaskStats taskStats = mock(ReplicationTaskStats.class);
+        when(replicationTask.getReplicationTaskStats()).thenReturn(taskStats);
+        when(taskStats.getFullLoadProgressPercent()).thenReturn(100);
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertFalse(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED, DmsHook.STATUS.RUNNING));
+    }
+
+    @Test
+    public void testReplaceFileParameters() throws IOException {
+        String path = this.getClass().getResource("table_mapping.json").getPath();
+
+        String jsonData = loadJson("table_mapping.json");
+
+        DmsHook dmsHook = new DmsHook();
+
+        String pathParameter = "file://" + path;
+        Assert.assertEquals(jsonData, dmsHook.replaceFileParameters(pathParameter));
+
+//        String pathParameter2 = "file://" + "not_exist.json";
+//
+//        try {
+//            Assert.assertEquals(pathParameter2, dmsHook.replaceFileParameters(pathParameter2));
+//        }catch (Exception e) {
+//            Assert.assertTrue(e instanceof IOException);
+//        }

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r980687944


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DeleteReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeConnectionsResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksRequest;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.Filter;
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.ResourceNotFoundException;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.StopReplicationTaskRequest;
+import com.amazonaws.services.databasemigrationservice.model.Tag;
+import com.amazonaws.services.databasemigrationservice.model.TestConnectionRequest;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+public class DmsHook {
+    protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+    private AWSDatabaseMigrationService client;
+    private String replicationTaskIdentifier;
+    private String sourceEndpointArn;
+    private String targetEndpointArn;
+    private String replicationInstanceArn;
+    private String migrationType;
+    private String tableMappings;
+    private String replicationTaskSettings;
+    private Date cdcStartTime;
+    private String cdcStartPosition;
+    private String cdcStopPosition;
+    private List<Tag> tags;
+    private String taskData;
+    private String resourceIdentifier;
+    private String replicationTaskArn;
+    private String startReplicationTaskType;
+
+    public DmsHook() {
+        this.client = createClient();
+    }
+
+    public static AWSDatabaseMigrationService createClient() {
+        final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+        final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+        final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
+        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
+        final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
+
+        // create a DMS client
+        return AWSDatabaseMigrationServiceClientBuilder.standard()
+            .withCredentials(awsCredentialsProvider)
+            .withRegion(awsRegion)
+            .build();
+    }
+
+    public Boolean createReplicationTask() throws Exception {
+        logger.info("createReplicationTask ......");
+        CreateReplicationTaskRequest request = new CreateReplicationTaskRequest()
+            .withReplicationTaskIdentifier(replicationTaskIdentifier)
+            .withSourceEndpointArn(sourceEndpointArn)
+            .withTargetEndpointArn(targetEndpointArn)
+            .withReplicationInstanceArn(replicationInstanceArn)
+            .withMigrationType(migrationType)
+            .withTableMappings(tableMappings)
+            .withReplicationTaskSettings(replicationTaskSettings)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition)
+            .withTags(tags)
+            .withTaskData(taskData)
+            .withResourceIdentifier(resourceIdentifier);
+
+        request.setTableMappings(replaceFileParameters(request.getTableMappings()));
+        request.setReplicationTaskSettings(replaceFileParameters(request.getReplicationTaskSettings()));
+
+        CreateReplicationTaskResult result = client.createReplicationTask(request);
+        replicationTaskIdentifier = result.getReplicationTask().getReplicationTaskIdentifier();
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        logger.info("replicationTaskIdentifier: {}, replicationTaskArn: {}", replicationTaskIdentifier, replicationTaskArn);
+        return awaitReplicationTaskStatus(STATUS.READY);
+    }
+
+
+    public Boolean startReplicationTask() {
+        logger.info("startReplicationTask ......");
+        StartReplicationTaskRequest request = new StartReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn)
+            .withStartReplicationTaskType(startReplicationTaskType)
+            .withCdcStartTime(cdcStartTime)
+            .withCdcStartPosition(cdcStartPosition)
+            .withCdcStopPosition(cdcStopPosition);
+        StartReplicationTaskResult result = client.startReplicationTask(request);
+        replicationTaskArn = result.getReplicationTask().getReplicationTaskArn();
+        return awaitReplicationTaskStatus(STATUS.RUNNING);
+    }
+
+    public Boolean checkFinishedReplicationTask() {
+        logger.info("checkFinishedReplicationTask ......");
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+        String stopReason = describeReplicationTasks().getStopReason();
+        return stopReason.endsWith(STATUS.FINISH_END_TOKEN);
+    }
+
+    public void stopReplicationTask() {
+        logger.info("stopReplicationTask ......");
+        if (replicationTaskArn == null) {
+            return;
+        }
+        StopReplicationTaskRequest request = new StopReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.stopReplicationTask(request);
+        awaitReplicationTaskStatus(STATUS.STOPPED);
+    }
+
+    public Boolean deleteReplicationTask() {
+        logger.info("deleteReplicationTask ......");
+        DeleteReplicationTaskRequest request = new DeleteReplicationTaskRequest()
+            .withReplicationTaskArn(replicationTaskArn);
+        client.deleteReplicationTask(request);
+        Boolean isDeleteSuccessfully;
+        try {
+            isDeleteSuccessfully = awaitReplicationTaskStatus(STATUS.DELETE);
+        } catch (ResourceNotFoundException e) {
+            isDeleteSuccessfully = true;
+        }
+        return isDeleteSuccessfully;
+    }
+
+    public Boolean testConnectionEndpoint() {
+        return (testConnection(replicationInstanceArn, sourceEndpointArn) && testConnection(replicationInstanceArn, targetEndpointArn));
+    }
+
+    public Boolean testConnection(String replicationInstanceArn, String endpointArn) {
+        logger.info("Test connect replication instance: {} and endpoint: {}", replicationInstanceArn, endpointArn);
+        TestConnectionRequest request = new TestConnectionRequest().
+            withReplicationInstanceArn(replicationInstanceArn)
+            .withEndpointArn(endpointArn);
+        try {
+            client.testConnection(request);
+        } catch (InvalidResourceStateException e) {
+            logger.info(e.getErrorMessage());
+        }
+
+        return awaitConnectSuccess(replicationInstanceArn, endpointArn);
+    }
+
+    public Boolean awaitConnectSuccess(String replicationInstanceArn, String endpointArn) {
+        Filter instanceFilters = new Filter().withName(AWS_KEY.REPLICATION_INSTANCE_ARN).withValues(replicationInstanceArn);
+        Filter endpointFilters = new Filter().withName(AWS_KEY.ENDPOINT_ARN).withValues(endpointArn);
+        DescribeConnectionsRequest request = new DescribeConnectionsRequest().withFilters(endpointFilters, instanceFilters)
+            .withMarker("");
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            DescribeConnectionsResult response = client.describeConnections(request);
+            String status = response.getConnections().get(0).getStatus();
+            if (status.equals(STATUS.SUCCESSFUL)) {
+                logger.info("Connect successful");
+                return true;
+            } else if (!status.equals(STATUS.TESTING)) {
+                break;
+            }
+        }
+        logger.info("Connect error");
+        return false;
+    }
+
+    public ReplicationTask describeReplicationTasks() {
+        Filter replicationTaskFilter = new Filter().withName(AWS_KEY.REPLICATION_TASK_ARN).withValues(replicationTaskArn);
+        DescribeReplicationTasksRequest request = new DescribeReplicationTasksRequest().withFilters(replicationTaskFilter).withMaxRecords(20).withMarker("");
+        DescribeReplicationTasksResult result = client.describeReplicationTasks(request);
+        ReplicationTask replicationTask = result.getReplicationTasks().get(0);
+
+        if (sourceEndpointArn == null) {
+            sourceEndpointArn = replicationTask.getSourceEndpointArn();
+        }
+
+        if (targetEndpointArn == null) {
+            targetEndpointArn = replicationTask.getTargetEndpointArn();
+        }
+
+        if (replicationInstanceArn == null) {
+            replicationInstanceArn = replicationTask.getReplicationInstanceArn();
+        }
+
+        if (replicationTaskArn == null) {
+            replicationTaskArn = replicationTask.getReplicationTaskArn();
+        }
+
+        return replicationTask;
+    }
+
+    public Boolean awaitReplicationTaskStatus(String exceptStatus, String... stopStatus) {
+        List<String> stopStatusSet = Arrays.asList(stopStatus);
+        Integer lastPercent = 0;
+        while (true) {
+            ThreadUtils.sleep(CONSTANTS.CHECK_INTERVAL);
+            ReplicationTask replicationTask = describeReplicationTasks();
+            String status = replicationTask.getStatus();
+
+            if (status.equals(STATUS.RUNNING) || status.equals(STATUS.STOPPED)) {
+                ReplicationTaskStats taskStats = replicationTask.getReplicationTaskStats();
+                Integer percent;
+                if (taskStats != null) {
+                    percent = taskStats.getFullLoadProgressPercent();
+                } else {
+                    percent = 0;
+                }
+                if (!lastPercent.equals(percent)) {
+                    String runningMessage = String.format("fullLoadProgressPercent: %s ", percent);
+                    logger.info(runningMessage);
+                }
+                lastPercent = percent;
+            }
+
+            if (exceptStatus.equals(status)) {
+                logger.info("success");

Review Comment:
   I think It is necessary to log messages about the DMS task connect status for every step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1242159312

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![59.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '59.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [59.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1257381305

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [23 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![56.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '56.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [56.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1260771241

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![54.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '54.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [54.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1245240205

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![59.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '59.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [59.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] github-code-scanning[bot] commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r969436369


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+public class DmsTask extends AbstractRemoteTask {
+
+    private static final ObjectMapper objectMapper =
+        new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+            .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+            .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+            .configure(REQUIRE_SETTERS_FOR_GETTERS, true)

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ObjectMapper.configure](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1222)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenReturn(client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.READY);
+        when(createReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.createReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+        Assert.assertEquals("task", dmsHook.getReplicationTaskIdentifier());
+    }
+
+    @Test(timeout = 60000)
+    public void testStartReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+        StartReplicationTaskResult startReplicationTaskResult = mock(StartReplicationTaskResult.class);
+        when(client.startReplicationTask(any())).thenReturn(startReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.RUNNING);
+        when(startReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.startReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckFinishedReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.STOPPED);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        when(replicationTask.getStopReason()).thenReturn("*_FINISHED");
+        Assert.assertTrue(dmsHook.checkFinishedReplicationTask());
+
+        when(replicationTask.getStopReason()).thenReturn("*_ERROR");
+        Assert.assertFalse(dmsHook.checkFinishedReplicationTask());
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.DELETE);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.deleteReplicationTask());
+
+    }
+
+    @Test
+    public void testTestConnectionEndpoint() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        String replicationInstanceArn = "replicationInstanceArn";
+        String trueSourceEndpointArn = "trueSourceEndpointArn";
+        String trueTargetEndpointArn = "trueTargetEndpointArn";
+        String falseSourceEndpointArn = "falseSourceEndpointArn";
+        String falseTargetEndpointArn = "falseTargetEndpointArn";
+
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueSourceEndpointArn);
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueTargetEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseSourceEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseTargetEndpointArn);
+
+
+        dmsHook.setReplicationInstanceArn(replicationInstanceArn);
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertTrue(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+    }
+
+
+    @Test
+    public void testDescribeReplicationTasks() {
+
+        DmsHook dmsHook = new DmsHook();
+        dmsHook.setReplicationInstanceArn("arn:aws:dms:ap-southeast-1:123456789012:task:task_exist");
+
+        DescribeReplicationTasksResult describeReplicationTasksResult = mock(DescribeReplicationTasksResult.class);
+        when(client.describeReplicationTasks(any())).thenReturn(describeReplicationTasksResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getSourceEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source");
+        when(replicationTask.getTargetEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target");
+
+        when(describeReplicationTasksResult.getReplicationTasks()).thenReturn(Arrays.asList(replicationTask));
+
+        ReplicationTask replicationTaskOut = dmsHook.describeReplicationTasks();
+        Assert.assertNotEquals(dmsHook.getReplicationInstanceArn(), replicationTaskOut.getReplicationTaskArn());
+        Assert.assertEquals("task", replicationTaskOut.getReplicationTaskIdentifier());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source", replicationTaskOut.getSourceEndpointArn());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target", replicationTaskOut.getTargetEndpointArn());
+
+    }
+
+
+    @Test(timeout = 60000)
+    public void testAwaitReplicationTaskStatus() {
+
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        ReplicationTaskStats taskStats = mock(ReplicationTaskStats.class);
+        when(replicationTask.getReplicationTaskStats()).thenReturn(taskStats);
+        when(taskStats.getFullLoadProgressPercent()).thenReturn(100);
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertFalse(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED, DmsHook.STATUS.RUNNING));
+    }
+
+    @Test
+    public void testReplaceFileParameters() throws IOException {
+        String path = this.getClass().getResource("table_mapping.json").getPath();

Review Comment:
   ## Unsafe use of getResource
   
   The idiom getClass().getResource() is unsafe for classes that may be extended.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1223)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
+import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
+import com.amazonaws.services.databasemigrationservice.model.DescribeReplicationTasksResult;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
+import com.amazonaws.services.databasemigrationservice.model.ReplicationTaskStats;
+import com.amazonaws.services.databasemigrationservice.model.StartReplicationTaskResult;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class,
+    DmsHook.class
+})
+@PowerMockIgnore({"javax.*"})
+public class DmsHookTest {
+
+    AWSDatabaseMigrationService client;
+
+    @Before
+    public void before() {
+        mockStatic(DmsHook.class);
+        client = mock(AWSDatabaseMigrationService.class);
+        when(DmsHook.createClient()).thenReturn(client);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateReplicationTask() throws Exception {
+
+        DmsHook dmsHook = spy(new DmsHook());
+        CreateReplicationTaskResult createReplicationTaskResult = mock(CreateReplicationTaskResult.class);
+        when(client.createReplicationTask(any())).thenReturn(createReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.READY);
+        when(createReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.createReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+        Assert.assertEquals("task", dmsHook.getReplicationTaskIdentifier());
+    }
+
+    @Test(timeout = 60000)
+    public void testStartReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+        StartReplicationTaskResult startReplicationTaskResult = mock(StartReplicationTaskResult.class);
+        when(client.startReplicationTask(any())).thenReturn(startReplicationTaskResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.RUNNING);
+        when(startReplicationTaskResult.getReplicationTask()).thenReturn(replicationTask);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.startReplicationTask());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:task:task", dmsHook.getReplicationTaskArn());
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckFinishedReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.STOPPED);
+
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        when(replicationTask.getStopReason()).thenReturn("*_FINISHED");
+        Assert.assertTrue(dmsHook.checkFinishedReplicationTask());
+
+        when(replicationTask.getStopReason()).thenReturn("*_ERROR");
+        Assert.assertFalse(dmsHook.checkFinishedReplicationTask());
+    }
+
+    @Test(timeout = 60000)
+    public void testDeleteReplicationTask() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getStatus()).thenReturn(DmsHook.STATUS.DELETE);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+        Assert.assertTrue(dmsHook.deleteReplicationTask());
+
+    }
+
+    @Test
+    public void testTestConnectionEndpoint() {
+        DmsHook dmsHook = spy(new DmsHook());
+
+        String replicationInstanceArn = "replicationInstanceArn";
+        String trueSourceEndpointArn = "trueSourceEndpointArn";
+        String trueTargetEndpointArn = "trueTargetEndpointArn";
+        String falseSourceEndpointArn = "falseSourceEndpointArn";
+        String falseTargetEndpointArn = "falseTargetEndpointArn";
+
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueSourceEndpointArn);
+        doReturn(true).when(dmsHook).testConnection(replicationInstanceArn, trueTargetEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseSourceEndpointArn);
+        doReturn(false).when(dmsHook).testConnection(replicationInstanceArn, falseTargetEndpointArn);
+
+
+        dmsHook.setReplicationInstanceArn(replicationInstanceArn);
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertTrue(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(trueSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(falseTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+        dmsHook.setSourceEndpointArn(falseSourceEndpointArn);
+        dmsHook.setTargetEndpointArn(trueTargetEndpointArn);
+        Assert.assertFalse(dmsHook.testConnectionEndpoint());
+
+    }
+
+
+    @Test
+    public void testDescribeReplicationTasks() {
+
+        DmsHook dmsHook = new DmsHook();
+        dmsHook.setReplicationInstanceArn("arn:aws:dms:ap-southeast-1:123456789012:task:task_exist");
+
+        DescribeReplicationTasksResult describeReplicationTasksResult = mock(DescribeReplicationTasksResult.class);
+        when(client.describeReplicationTasks(any())).thenReturn(describeReplicationTasksResult);
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        when(replicationTask.getReplicationTaskArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:task:task");
+        when(replicationTask.getReplicationTaskIdentifier()).thenReturn("task");
+        when(replicationTask.getSourceEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source");
+        when(replicationTask.getTargetEndpointArn()).thenReturn("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target");
+
+        when(describeReplicationTasksResult.getReplicationTasks()).thenReturn(Arrays.asList(replicationTask));
+
+        ReplicationTask replicationTaskOut = dmsHook.describeReplicationTasks();
+        Assert.assertNotEquals(dmsHook.getReplicationInstanceArn(), replicationTaskOut.getReplicationTaskArn());
+        Assert.assertEquals("task", replicationTaskOut.getReplicationTaskIdentifier());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:source", replicationTaskOut.getSourceEndpointArn());
+        Assert.assertEquals("arn:aws:dms:ap-southeast-1:123456789012:endpoint:target", replicationTaskOut.getTargetEndpointArn());
+
+    }
+
+
+    @Test(timeout = 60000)
+    public void testAwaitReplicationTaskStatus() {
+
+        DmsHook dmsHook = spy(new DmsHook());
+
+        ReplicationTask replicationTask = mock(ReplicationTask.class);
+        doReturn(replicationTask).when(dmsHook).describeReplicationTasks();
+
+        ReplicationTaskStats taskStats = mock(ReplicationTaskStats.class);
+        when(replicationTask.getReplicationTaskStats()).thenReturn(taskStats);
+        when(taskStats.getFullLoadProgressPercent()).thenReturn(100);
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertTrue(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED));
+
+        when(replicationTask.getStatus()).thenReturn(
+            DmsHook.STATUS.RUNNING,
+            DmsHook.STATUS.STOPPED
+        );
+        Assert.assertFalse(dmsHook.awaitReplicationTaskStatus(DmsHook.STATUS.STOPPED, DmsHook.STATUS.RUNNING));
+    }
+
+    @Test
+    public void testReplaceFileParameters() throws IOException {
+        String path = this.getClass().getResource("table_mapping.json").getPath();
+
+        String jsonData = loadJson("table_mapping.json");
+
+        DmsHook dmsHook = new DmsHook();
+
+        String pathParameter = "file://" + path;
+        Assert.assertEquals(jsonData, dmsHook.replaceFileParameters(pathParameter));
+
+//        String pathParameter2 = "file://" + "not_exist.json";
+//
+//        try {
+//            Assert.assertEquals(pathParameter2, dmsHook.replaceFileParameters(pathParameter2));
+//        }catch (Exception e) {
+//            Assert.assertTrue(e instanceof IOException);
+//        }
+
+        String pathParameter3 = "{}";
+        Assert.assertEquals(pathParameter3, dmsHook.replaceFileParameters(pathParameter3));
+
+    }
+
+//    this.getClass().getResourceAsStream("SagemakerRequestJson.json"))
+
+    private String loadJson(String fileName) {
+        String jsonData;
+        try (InputStream i = this.getClass().getResourceAsStream(fileName)) {

Review Comment:
   ## Unsafe use of getResource
   
   The idiom getClass().getResource() is unsafe for classes that may be extended.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1224)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] EricGao888 commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r980699024


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;

Review Comment:
   Could u plz removing the dependency of `powermock` in this PR? We have already had a PR to remove all `powermock` in `dolphinscheduler-task-plugin` module, see: https://github.com/apache/dolphinscheduler/pull/11778
   Thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on a diff in pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on code in PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#discussion_r980702563


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/test/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHookTest.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.dms;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.mock;

Review Comment:
   Yes, I will remove powermock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1261880196

   Kudos, SonarCloud Quality Gate passed!&nbsp; &nbsp; [![Quality Gate passed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/passed-16px.png 'Quality Gate passed')](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler&pullRequest=11868)
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL) [22 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=11868&resolved=false&types=CODE_SMELL)
   
   [![54.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/50-16px.png '54.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list) [54.9% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_coverage&view=list)  
   [![2.0%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '2.0%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list) [2.0% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=11868&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [dolphinscheduler] jieguangzhou commented on pull request #11868: [Feature][TASK-PLUGIN] AWS Database Migration Service (DMS)

Posted by GitBox <gi...@apache.org>.
jieguangzhou commented on PR #11868:
URL: https://github.com/apache/dolphinscheduler/pull/11868#issuecomment-1263025130

   Thanks all


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org