You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/18 07:38:44 UTC

[GitHub] [incubator-seatunnel] ic4y opened a new pull request, #2456: [Engine][Task] Add cancel Task

ic4y opened a new pull request, #2456:
URL: https://github.com/apache/incubator-seatunnel/pull/2456

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   Add task cancel and modify the test code
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [x] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2456: [Engine][Task] Add cancel Task

Posted by GitBox <gi...@apache.org>.
ashulin commented on code in PR #2456:
URL: https://github.com/apache/incubator-seatunnel/pull/2456#discussion_r949745991


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -129,10 +131,21 @@ public NonCompletableFuture<TaskExecutionState> deployTask(
 
             // TODO Use classloader load the connector jars and deserialize Task
             taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
+            return deployLocalTask(taskGroup, resultFuture);
+        } catch (Throwable t) {
+            logger.severe(ExceptionUtils.getMessage(t));

Review Comment:
   Can we output logs based on job level?



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java:
##########
@@ -87,16 +87,15 @@ public void testCancel() {
         long sleepTime = 300;
 
         AtomicBoolean stop = new AtomicBoolean(false);
-        TestTask testTask1 = new TestTask(stop, logger, sleepTime,true);
-        TestTask testTask2 = new TestTask(stop, logger, sleepTime,false);
-
-        CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+        TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
+        TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
 
-        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+        long taskGroupId = flakeIdGenerator.newId();
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(taskGroupId, "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
 
-        cancellationFuture.cancel(true);
+        taskExecutionService.cancelTaskGroup(taskGroupId);
 
-        await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(()->{
+        await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(() -> {

Review Comment:
   Can't `CheckStyle` check for this style error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X merged pull request #2456: [Engine][Task] Add cancel Task

Posted by GitBox <gi...@apache.org>.
Hisoka-X merged PR #2456:
URL: https://github.com/apache/incubator-seatunnel/pull/2456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #2456: [Engine][Task] Add cancel Task

Posted by GitBox <gi...@apache.org>.
ic4y commented on PR #2456:
URL: https://github.com/apache/incubator-seatunnel/pull/2456#issuecomment-1221958471

   @Hisoka-X PTAL Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2456: [Engine][Task] Add cancel Task

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #2456:
URL: https://github.com/apache/incubator-seatunnel/pull/2456#discussion_r949752234


##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java:
##########
@@ -87,16 +87,15 @@ public void testCancel() {
         long sleepTime = 300;
 
         AtomicBoolean stop = new AtomicBoolean(false);
-        TestTask testTask1 = new TestTask(stop, logger, sleepTime,true);
-        TestTask testTask2 = new TestTask(stop, logger, sleepTime,false);
-
-        CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+        TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
+        TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
 
-        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+        long taskGroupId = flakeIdGenerator.newId();
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(taskGroupId, "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
 
-        cancellationFuture.cancel(true);
+        taskExecutionService.cancelTaskGroup(taskGroupId);
 
-        await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(()->{
+        await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(() -> {

Review Comment:
   My mistake, my idea CheckStyle didn't cover the test, I've fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2456: [Engine][Task] Add cancel Task

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #2456:
URL: https://github.com/apache/incubator-seatunnel/pull/2456#discussion_r949758167


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -129,10 +131,21 @@ public NonCompletableFuture<TaskExecutionState> deployTask(
 
             // TODO Use classloader load the connector jars and deserialize Task
             taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
+            return deployLocalTask(taskGroup, resultFuture);
+        } catch (Throwable t) {
+            logger.severe(ExceptionUtils.getMessage(t));

Review Comment:
   This is an optimization point. Let's do it later.  Now temporarily add the JobID to the log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org