You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/13 07:28:30 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: DataxTask process test modify (#2162)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 71b11e2  DataxTask process test modify (#2162)
71b11e2 is described below

commit 71b11e2c08b7813d3b14c4dc7e6724ff36692934
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Fri Mar 13 15:28:22 2020 +0800

    DataxTask process test modify (#2162)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    * ProcedureTask optimization
    
    * MasterSchedulerService modify
    
    * TaskUpdateQueueConsumer modify
    
    * test
    
    * DataxTask process run debug
    
    * DataxTask process run debug
    
    * add protobuf dependency,MR、Spark task etc need this
    
    * TaskUpdateQueueConsumer modify
    
    Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
 dolphinscheduler-server/pom.xml                             |  4 ----
 .../server/master/consumer/TaskUpdateQueueConsumer.java     | 13 +++++++++++++
 .../server/worker/task/datax/DataxTask.java                 |  3 +--
 3 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 080b87e..8dfee12 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -37,10 +37,6 @@
 			<artifactId>dolphinscheduler-common</artifactId>
 			<exclusions>
 				<exclusion>
-					<artifactId>protobuf-java</artifactId>
-					<groupId>com.google.protobuf</groupId>
-				</exclusion>
-				<exclusion>
 					<groupId>io.netty</groupId>
 					<artifactId>netty</artifactId>
 				</exclusion>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index b54f39f..8b00133 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UdfType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
 import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
 import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
 import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -178,7 +179,19 @@ public class TaskUpdateQueueConsumer extends Thread{
 
         // DATAX task
         if (taskType == TaskType.DATAX){
+            DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
 
+            DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
+            DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+
+
+            dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+            dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+            dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+
+            dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+            dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+            dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
         }
 
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 1eee6f2..391f522 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -216,8 +216,7 @@ public class DataxTask extends AbstractTask {
      * @return collection of datax job config JSONObject
      * @throws SQLException if error throws SQLException
      */
-    private List<JSONObject> buildDataxJobContentJson()
-        throws SQLException {
+    private List<JSONObject> buildDataxJobContentJson() throws SQLException {
         DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();