You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/06/28 02:48:23 UTC

[incubator-dolphinscheduler] branch dev updated: Revise annotation spelling errors & Enhanced code robustness (#3042)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9bf67d8  Revise annotation spelling errors & Enhanced code robustness (#3042)
9bf67d8 is described below

commit 9bf67d80d0db866d2ead7fce1b9d33767ee0f66e
Author: zixi0825 <64...@qq.com>
AuthorDate: Sun Jun 28 10:48:14 2020 +0800

    Revise annotation spelling errors & Enhanced code robustness (#3042)
    
    * revise annotation spelling errors & enhanced code robustness
    
    * revise annotation spelling errors & enhanced code robustness
    
    Co-authored-by: sunchaohe <su...@linklogis.com>
    Co-authored-by: dailidong <da...@gmail.com>
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
 .../dolphinscheduler/api/utils/CheckUtils.java     |  2 +-
 .../dolphinscheduler/remote/command/TaskInfo.java  |  2 +-
 .../server/entity/TaskExecutionContext.java        |  2 +-
 .../worker/processor/NettyRemoteChannel.java       |  2 +-
 .../worker/processor/TaskCallbackService.java      |  5 +-
 .../worker/processor/TaskExecuteProcessor.java     | 14 ++++--
 .../worker/processor/TaskCallbackServiceTest.java  | 56 ++++++++++++++++++++--
 7 files changed, 68 insertions(+), 15 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
index 9c3bbe9..9dee69b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
@@ -115,7 +115,7 @@ public class CheckUtils {
    *
    * @param parameter parameter
    * @param taskType task type
-   * @return true if taks node parameters are valid, otherwise return false
+   * @return true if task node parameters are valid, otherwise return false
    */
   public static boolean checkTaskNodeParameters(String parameter, String taskType) {
     AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
index 196d0a7..1adf5a8 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
@@ -34,7 +34,7 @@ public class TaskInfo implements Serializable{
 
 
     /**
-     *  taks name
+     *  task name
      */
     private String taskName;
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 7b4c721..3fc65c1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -39,7 +39,7 @@ public class TaskExecutionContext implements Serializable{
 
 
     /**
-     *  taks name
+     *  task name
      */
     private String taskName;
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
index cbb8972..b1b67af 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
@@ -34,7 +34,7 @@ public class NettyRemoteChannel {
     private final Channel channel;
 
     /**
-     *  equest unique identification
+     *  request unique identification
      */
     private final long opaque;
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 1e8bf9d..0fe7524 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -33,14 +32,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 /**
- *  taks callback service
+ *  task callback service
  */
 @Service
 public class TaskCallbackService {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 50f8989..0af84b1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -51,7 +51,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
 
-
     /**
      *  thread executor service
      */
@@ -83,9 +82,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
         logger.info("received command : {}", taskRequestCommand);
 
-        String contextJson = taskRequestCommand.getTaskExecutionContext();
+        if(taskRequestCommand == null){
+            logger.error("task execute request command is null");
+            return;
+        }
 
+        String contextJson = taskRequestCommand.getTaskExecutionContext();
         TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
+        if(taskExecutionContext == null){
+            logger.error("task execution context is null");
+            return;
+        }
+
         taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
 
         // local execute path
@@ -102,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
         // tell master that task is in executing
         final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
-        
+
         try {
             RetryerUtils.retryCall(() -> {
                 taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 78ba3a6..e064f4c 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor;
 
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
@@ -52,10 +55,23 @@ import java.util.Date;
  * test task call back service
  */
 @RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class,
-        ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
-        ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class,
-        TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class})
+@ContextConfiguration(classes={
+        TaskCallbackServiceTestConfig.class,
+        SpringZKServer.class,
+        SpringApplicationContext.class,
+        MasterRegistry.class,
+        WorkerRegistry.class,
+        ZookeeperRegistryCenter.class,
+        MasterConfig.class,
+        WorkerConfig.class,
+        ZookeeperCachedOperator.class,
+        ZookeeperConfig.class,
+        ZookeeperNodeManager.class,
+        TaskCallbackService.class,
+        TaskResponseService.class,
+        TaskAckProcessor.class,
+        TaskResponseProcessor.class,
+        TaskExecuteProcessor.class})
 public class TaskCallbackServiceTest {
 
     @Autowired
@@ -70,6 +86,9 @@ public class TaskCallbackServiceTest {
     @Autowired
     private TaskResponseProcessor taskResponseProcessor;
 
+    @Autowired
+    private TaskExecuteProcessor taskExecuteProcessor;
+
     /**
      * send ack test
      * @throws Exception
@@ -176,6 +195,35 @@ public class TaskCallbackServiceTest {
         }
     }
 
+    @Test
+    public void testTaskExecuteProcessor() throws Exception{
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(30000);
+        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
+        nettyRemotingServer.start();
+
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+
+        TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand();
+
+        nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
+
+        taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext()));
+
+        nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
+
+        Thread.sleep(5000);
+
+        Stopper.stop();
+
+        Thread.sleep(5000);
+
+        nettyRemotingServer.close();
+        nettyRemotingClient.close();
+    }
+
 //    @Test(expected = IllegalStateException.class)
 //    public void testSendAckWithIllegalStateException2(){
 //        masterRegistry.registry();