You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/06/28 02:48:23 UTC
[incubator-dolphinscheduler] branch dev updated: Revise annotation
spelling errors & Enhanced code robustness (#3042)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9bf67d8 Revise annotation spelling errors & Enhanced code robustness (#3042)
9bf67d8 is described below
commit 9bf67d80d0db866d2ead7fce1b9d33767ee0f66e
Author: zixi0825 <64...@qq.com>
AuthorDate: Sun Jun 28 10:48:14 2020 +0800
Revise annotation spelling errors & Enhanced code robustness (#3042)
* revise annotation spelling errors & enhanced code robustness
* revise annotation spelling errors & enhanced code robustness
Co-authored-by: sunchaohe <su...@linklogis.com>
Co-authored-by: dailidong <da...@gmail.com>
Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
.../dolphinscheduler/api/utils/CheckUtils.java | 2 +-
.../dolphinscheduler/remote/command/TaskInfo.java | 2 +-
.../server/entity/TaskExecutionContext.java | 2 +-
.../worker/processor/NettyRemoteChannel.java | 2 +-
.../worker/processor/TaskCallbackService.java | 5 +-
.../worker/processor/TaskExecuteProcessor.java | 14 ++++--
.../worker/processor/TaskCallbackServiceTest.java | 56 ++++++++++++++++++++--
7 files changed, 68 insertions(+), 15 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
index 9c3bbe9..9dee69b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
@@ -115,7 +115,7 @@ public class CheckUtils {
*
* @param parameter parameter
* @param taskType task type
- * @return true if taks node parameters are valid, otherwise return false
+ * @return true if task node parameters are valid, otherwise return false
*/
public static boolean checkTaskNodeParameters(String parameter, String taskType) {
AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
index 196d0a7..1adf5a8 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
@@ -34,7 +34,7 @@ public class TaskInfo implements Serializable{
/**
- * taks name
+ * task name
*/
private String taskName;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 7b4c721..3fc65c1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -39,7 +39,7 @@ public class TaskExecutionContext implements Serializable{
/**
- * taks name
+ * task name
*/
private String taskName;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
index cbb8972..b1b67af 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
@@ -34,7 +34,7 @@ public class NettyRemoteChannel {
private final Channel channel;
/**
- * equest unique identification
+ * request unique identification
*/
private final long opaque;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 1e8bf9d..0fe7524 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
-
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -33,14 +32,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
- * taks callback service
+ * task callback service
*/
@Service
public class TaskCallbackService {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 50f8989..0af84b1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -51,7 +51,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
-
/**
* thread executor service
*/
@@ -83,9 +82,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.info("received command : {}", taskRequestCommand);
- String contextJson = taskRequestCommand.getTaskExecutionContext();
+ if(taskRequestCommand == null){
+ logger.error("task execute request command is null");
+ return;
+ }
+ String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
+ if(taskExecutionContext == null){
+ logger.error("task execution context is null");
+ return;
+ }
+
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
// local execute path
@@ -102,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// tell master that task is in executing
final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
-
+
try {
RetryerUtils.retryCall(() -> {
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 78ba3a6..e064f4c 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
@@ -52,10 +55,23 @@ import java.util.Date;
* test task call back service
*/
@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class,
- ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
- ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class,
- TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class})
+@ContextConfiguration(classes={
+ TaskCallbackServiceTestConfig.class,
+ SpringZKServer.class,
+ SpringApplicationContext.class,
+ MasterRegistry.class,
+ WorkerRegistry.class,
+ ZookeeperRegistryCenter.class,
+ MasterConfig.class,
+ WorkerConfig.class,
+ ZookeeperCachedOperator.class,
+ ZookeeperConfig.class,
+ ZookeeperNodeManager.class,
+ TaskCallbackService.class,
+ TaskResponseService.class,
+ TaskAckProcessor.class,
+ TaskResponseProcessor.class,
+ TaskExecuteProcessor.class})
public class TaskCallbackServiceTest {
@Autowired
@@ -70,6 +86,9 @@ public class TaskCallbackServiceTest {
@Autowired
private TaskResponseProcessor taskResponseProcessor;
+ @Autowired
+ private TaskExecuteProcessor taskExecuteProcessor;
+
/**
* send ack test
* @throws Exception
@@ -176,6 +195,35 @@ public class TaskCallbackServiceTest {
}
}
+ @Test
+ public void testTaskExecuteProcessor() throws Exception{
+ final NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(30000);
+ NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
+ nettyRemotingServer.start();
+
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
+
+ TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand();
+
+ nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
+
+ taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext()));
+
+ nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
+
+ Thread.sleep(5000);
+
+ Stopper.stop();
+
+ Thread.sleep(5000);
+
+ nettyRemotingServer.close();
+ nettyRemotingClient.close();
+ }
+
// @Test(expected = IllegalStateException.class)
// public void testSendAckWithIllegalStateException2(){
// masterRegistry.registry();