You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/19 02:30:28 UTC

[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2456: [Engine][Task] Add cancel Task

ashulin commented on code in PR #2456:
URL: https://github.com/apache/incubator-seatunnel/pull/2456#discussion_r949745991


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -129,10 +131,21 @@ public NonCompletableFuture<TaskExecutionState> deployTask(
 
             // TODO Use classloader load the connector jars and deserialize Task
             taskGroup = nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
+            return deployLocalTask(taskGroup, resultFuture);
+        } catch (Throwable t) {
+            logger.severe(ExceptionUtils.getMessage(t));

Review Comment:
   Can we output logs based on job level?



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java:
##########
@@ -87,16 +87,15 @@ public void testCancel() {
         long sleepTime = 300;
 
         AtomicBoolean stop = new AtomicBoolean(false);
-        TestTask testTask1 = new TestTask(stop, logger, sleepTime,true);
-        TestTask testTask2 = new TestTask(stop, logger, sleepTime,false);
-
-        CompletableFuture<Void> cancellationFuture = new CompletableFuture<Void>();
+        TestTask testTask1 = new TestTask(stop, logger, sleepTime, true);
+        TestTask testTask2 = new TestTask(stop, logger, sleepTime, false);
 
-        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.submitTaskGroup(new TaskGroupDefaultImpl("ts", Lists.newArrayList(testTask1,testTask2)), cancellationFuture);
+        long taskGroupId = flakeIdGenerator.newId();
+        CompletableFuture<TaskExecutionState> completableFuture = taskExecutionService.deployLocalTask(new TaskGroupDefaultImpl(taskGroupId, "ts", Lists.newArrayList(testTask1, testTask2)), new CompletableFuture<>());
 
-        cancellationFuture.cancel(true);
+        taskExecutionService.cancelTaskGroup(taskGroupId);
 
-        await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(()->{
+        await().atMost(sleepTime + 300, TimeUnit.MILLISECONDS).untilAsserted(() -> {

Review Comment:
   Can't `CheckStyle` check for this style error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org