You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/13 07:28:30 UTC
[incubator-dolphinscheduler] branch refactor-worker updated:
DataxTask process test modify (#2162)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 71b11e2 DataxTask process test modify (#2162)
71b11e2 is described below
commit 71b11e2c08b7813d3b14c4dc7e6724ff36692934
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Fri Mar 13 15:28:22 2020 +0800
DataxTask process test modify (#2162)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
dolphinscheduler-server/pom.xml | 4 ----
.../server/master/consumer/TaskUpdateQueueConsumer.java | 13 +++++++++++++
.../server/worker/task/datax/DataxTask.java | 3 +--
3 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 080b87e..8dfee12 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -37,10 +37,6 @@
<artifactId>dolphinscheduler-common</artifactId>
<exclusions>
<exclusion>
- <artifactId>protobuf-java</artifactId>
- <groupId>com.google.protobuf</groupId>
- </exclusion>
- <exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index b54f39f..8b00133 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -178,7 +179,19 @@ public class TaskUpdateQueueConsumer extends Thread{
// DATAX task
if (taskType == TaskType.DATAX){
+ DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
+ DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
+ DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+
+
+ dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+ dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+ dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+
+ dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+ dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+ dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 1eee6f2..391f522 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -216,8 +216,7 @@ public class DataxTask extends AbstractTask {
* @return collection of datax job config JSONObject
* @throws SQLException if error throws SQLException
*/
- private List<JSONObject> buildDataxJobContentJson()
- throws SQLException {
+ private List<JSONObject> buildDataxJobContentJson() throws SQLException {
DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();