You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/12/22 07:32:48 UTC
[incubator-dolphinscheduler] branch dev updated: [Improvement]
Refactor code to support distributed tracing (#4270)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3a28a71 [Improvement] Refactor code to support distributed tracing (#4270)
3a28a71 is described below
commit 3a28a71b007a26e7613d2f23094b8b4cbfe111ff
Author: hailin0 <ha...@foxmail.com>
AuthorDate: Tue Dec 22 15:32:41 2020 +0800
[Improvement] Refactor code to support distributed tracing (#4270)
* Refactor code to support tracing
* Extension network protocol, support context and version
* Extension master asynchronous queue support context
* Extract scan task method from MasterSchedulerService for tracing
* fix
* fix
* add test case
* fix
* fix
Co-authored-by: hailin0 <ha...@yeah.net>
---
.../remote/codec/NettyDecoder.java | 33 ++++++++
.../remote/codec/NettyEncoder.java | 7 ++
.../dolphinscheduler/remote/command/Command.java | 14 ++++
.../{CommandHeader.java => CommandContext.java} | 48 +++++------
.../remote/command/CommandHeader.java | 26 ++++++
.../master/consumer/TaskPriorityQueueConsumer.java | 20 ++---
.../master/runner/MasterBaseTaskExecThread.java | 37 +++------
.../master/runner/MasterSchedulerService.java | 77 +++++++++--------
.../consumer/TaskPriorityQueueConsumerTest.java | 27 +++---
.../service/queue}/TaskPriority.java | 97 ++++++++++++++--------
.../service/queue/TaskPriorityQueueImpl.java | 46 +---------
.../src/test/java/queue/TaskPriorityTest.java | 83 ++++++++++++++++++
.../src/test/java/queue/TaskUpdateQueueTest.java | 20 ++---
13 files changed, 338 insertions(+), 197 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
index 179ae1b..343e8c6 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandContext;
import org.apache.dolphinscheduler.remote.command.CommandHeader;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.slf4j.Logger;
@@ -54,16 +55,34 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
switch (state()){
case MAGIC:
checkMagic(in.readByte());
+ checkpoint(State.VERSION);
+ // fallthru
+ case VERSION:
+ checkVersion(in.readByte());
checkpoint(State.COMMAND);
+ // fallthru
case COMMAND:
commandHeader.setType(in.readByte());
checkpoint(State.OPAQUE);
+ // fallthru
case OPAQUE:
commandHeader.setOpaque(in.readLong());
+ checkpoint(State.CONTEXT_LENGTH);
+ // fallthru
+ case CONTEXT_LENGTH:
+ commandHeader.setContextLength(in.readInt());
+ checkpoint(State.CONTEXT);
+ // fallthru
+ case CONTEXT:
+ byte[] context = new byte[commandHeader.getContextLength()];
+ in.readBytes(context);
+ commandHeader.setContext(context);
checkpoint(State.BODY_LENGTH);
+ // fallthru
case BODY_LENGTH:
commandHeader.setBodyLength(in.readInt());
checkpoint(State.BODY);
+ // fallthru
case BODY:
byte[] body = new byte[commandHeader.getBodyLength()];
in.readBytes(body);
@@ -71,6 +90,7 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
Command packet = new Command();
packet.setType(commandType(commandHeader.getType()));
packet.setOpaque(commandHeader.getOpaque());
+ packet.setContext(CommandContext.valueOf(commandHeader.getContext()));
packet.setBody(body);
out.add(packet);
//
@@ -105,10 +125,23 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
}
}
+ /**
+ * check version
+ * @param version
+ */
+ private void checkVersion(byte version) {
+ if (version != Command.VERSION) {
+ throw new IllegalArgumentException("illegal protocol [version]" + version);
+ }
+ }
+
enum State{
MAGIC,
+ VERSION,
COMMAND,
OPAQUE,
+ CONTEXT_LENGTH,
+ CONTEXT,
BODY_LENGTH,
BODY;
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
index 4e9836a..785ee5a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
@@ -42,11 +42,18 @@ public class NettyEncoder extends MessageToByteEncoder<Command> {
throw new Exception("encode msg is null");
}
out.writeByte(Command.MAGIC);
+ out.writeByte(Command.VERSION);
out.writeByte(msg.getType().ordinal());
out.writeLong(msg.getOpaque());
+ writeContext(msg, out);
out.writeInt(msg.getBody().length);
out.writeBytes(msg.getBody());
}
+ private void writeContext(Command msg, ByteBuf out) {
+ byte[] headerBytes = msg.getContext().toBytes();
+ out.writeInt(headerBytes.length);
+ out.writeBytes(headerBytes);
+ }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
index ed46e1f..9baa321 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -28,6 +28,7 @@ public class Command implements Serializable {
private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe;
+ public static final byte VERSION = 0;
public Command(){
this.opaque = REQUEST_ID.getAndIncrement();
@@ -48,6 +49,11 @@ public class Command implements Serializable {
private long opaque;
/**
+ * request context
+ */
+ private CommandContext context = new CommandContext();
+
+ /**
* data body
*/
private byte[] body;
@@ -76,6 +82,14 @@ public class Command implements Serializable {
this.body = body;
}
+ public CommandContext getContext() {
+ return context;
+ }
+
+ public void setContext(CommandContext context) {
+ this.context = context;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java
similarity index 55%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java
index 78948a5..c9febee 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java
@@ -14,51 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.remote.command;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
/**
- * command header
+ * command context
*/
-public class CommandHeader implements Serializable {
-
- /**
- * type
- */
- private byte type;
-
- /**
- * request unique identification
- */
- private long opaque;
+public class CommandContext implements Serializable {
- /**
- * body length
- */
- private int bodyLength;
+ private Map<String, String> items = new LinkedHashMap<>();
- public int getBodyLength() {
- return bodyLength;
+ public Map<String, String> getItems() {
+ return items;
}
- public void setBodyLength(int bodyLength) {
- this.bodyLength = bodyLength;
+ public void setItems(Map<String, String> items) {
+ this.items = items;
}
- public byte getType() {
- return type;
+ public void put(String key, String value) {
+ items.put(key, value);
}
- public void setType(byte type) {
- this.type = type;
+ public String get(String key) {
+ return items.get(key);
}
- public long getOpaque() {
- return opaque;
+ public byte[] toBytes() {
+ return JSONUtils.toJsonByteArray(this);
}
- public void setOpaque(long opaque) {
- this.opaque = opaque;
+ public static CommandContext valueOf(byte[] src) {
+ return JSONUtils.parseObject(src, CommandContext.class);
}
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
index 78948a5..9e83a42 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -34,6 +34,16 @@ public class CommandHeader implements Serializable {
private long opaque;
/**
+ * context length
+ */
+ private int contextLength;
+
+ /**
+ * context
+ */
+ private byte[] context;
+
+ /**
* body length
*/
private int bodyLength;
@@ -61,4 +71,20 @@ public class CommandHeader implements Serializable {
public void setOpaque(long opaque) {
this.opaque = opaque;
}
+
+ public int getContextLength() {
+ return contextLength;
+ }
+
+ public void setContextLength(int contextLength) {
+ this.contextLength = contextLength;
+ }
+
+ public byte[] getContext() {
+ return context;
+ }
+
+ public void setContext(byte[] context) {
+ this.context = context;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index a407e4e..2325508 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -50,13 +50,13 @@ import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.entity.TaskPriority;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import java.util.ArrayList;
@@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* taskUpdateQueue
*/
@Autowired
- private TaskPriorityQueue<String> taskPriorityQueue;
+ private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
/**
* processService
@@ -119,7 +119,7 @@ public class TaskPriorityQueueConsumer extends Thread {
@Override
public void run() {
- List<String> failedDispatchTasks = new ArrayList<>();
+ List<TaskPriority> failedDispatchTasks = new ArrayList<>();
while (Stopper.isRunning()) {
try {
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
@@ -130,15 +130,14 @@ public class TaskPriorityQueueConsumer extends Thread {
continue;
}
// if not task , blocking here
- String taskPriorityInfo = taskPriorityQueue.take();
- TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
- boolean dispatchResult = dispatch(taskPriority.getTaskId());
+ TaskPriority taskPriority = taskPriorityQueue.take();
+ boolean dispatchResult = dispatch(taskPriority);
if (!dispatchResult) {
- failedDispatchTasks.add(taskPriorityInfo);
+ failedDispatchTasks.add(taskPriority);
}
}
if (!failedDispatchTasks.isEmpty()) {
- for (String dispatchFailedTask : failedDispatchTasks) {
+ for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If there are tasks in a cycle that cannot find the worker group,
@@ -157,12 +156,13 @@ public class TaskPriorityQueueConsumer extends Thread {
/**
* dispatch task
*
- * @param taskInstanceId taskInstanceId
+ * @param taskPriority taskPriority
* @return result
*/
- protected boolean dispatch(int taskInstanceId) {
+ protected boolean dispatch(TaskPriority taskPriority) {
boolean result = false;
try {
+ int taskInstanceId = taskPriority.getTaskId();
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index f5c3708..fcff67f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -16,8 +16,6 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
-import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -27,17 +25,15 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
-import java.util.concurrent.Callable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.dolphinscheduler.common.Constants.*;
import java.util.Date;
import java.util.concurrent.Callable;
@@ -217,14 +213,14 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
logger.info("task ready to submit: {}", taskInstance);
/**
- * taskPriorityInfo
+ * taskPriority
*/
- String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
+ TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(),
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
- taskUpdateQueue.put(taskPriorityInfo);
+ taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
return true;
}catch (Exception e){
@@ -235,29 +231,22 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
/**
- * buildTaskPriorityInfo
+ * buildTaskPriority
*
* @param processInstancePriority processInstancePriority
* @param processInstanceId processInstanceId
* @param taskInstancePriority taskInstancePriority
* @param taskInstanceId taskInstanceId
* @param workerGroup workerGroup
- * @return TaskPriorityInfo
+ * @return TaskPriority
*/
- private String buildTaskPriorityInfo(int processInstancePriority,
- int processInstanceId,
- int taskInstancePriority,
- int taskInstanceId,
- String workerGroup) {
- return processInstancePriority +
- UNDERLINE +
- processInstanceId +
- UNDERLINE +
- taskInstancePriority +
- UNDERLINE +
- taskInstanceId +
- UNDERLINE +
- workerGroup;
+ private TaskPriority buildTaskPriority(int processInstancePriority,
+ int processInstanceId,
+ int taskInstancePriority,
+ int taskInstanceId,
+ String workerGroup) {
+ return new TaskPriority(processInstancePriority, processInstanceId,
+ taskInstancePriority, taskInstanceId, workerGroup);
}
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 30dd0f9..b0e0528 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -124,52 +124,57 @@ public class MasterSchedulerService extends Thread {
public void run() {
logger.info("master scheduler started");
while (Stopper.isRunning()){
- InterProcessMutex mutex = null;
try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
- if(!runCheckFlag) {
+ if (!runCheckFlag) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
+ scheduleProcess();
+ }
+ } catch (Exception e) {
+ logger.error("master scheduler thread error", e);
+ }
+ }
+ }
- mutex = zkMasterClient.blockAcquireMutex();
-
- int activeCount = masterExecService.getActiveCount();
- // make sure to scan and delete command table in one transaction
- Command command = processService.findOneCommand();
- if (command != null) {
- logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
-
- try{
-
- ProcessInstance processInstance = processService.handleCommand(logger,
- getLocalAddress(),
- this.masterConfig.getMasterExecThreads() - activeCount, command);
- if (processInstance != null) {
- logger.info("start master exec thread , split DAG ...");
- masterExecService.execute(
- new MasterExecThread(
- processInstance
- , processService
- , nettyRemotingClient
- , alertManager
- , masterConfig));
- }
- }catch (Exception e){
- logger.error("scan command error ", e);
- processService.moveToErrorCommand(command, e.toString());
- }
- } else{
- //indicate that no command ,sleep for 1s
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ private void scheduleProcess() throws Exception {
+ InterProcessMutex mutex = null;
+ try {
+ mutex = zkMasterClient.blockAcquireMutex();
+
+ int activeCount = masterExecService.getActiveCount();
+ // make sure to scan and delete command table in one transaction
+ Command command = processService.findOneCommand();
+ if (command != null) {
+ logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
+
+ try {
+
+ ProcessInstance processInstance = processService.handleCommand(logger,
+ getLocalAddress(),
+ this.masterConfig.getMasterExecThreads() - activeCount, command);
+ if (processInstance != null) {
+ logger.info("start master exec thread , split DAG ...");
+ masterExecService.execute(
+ new MasterExecThread(
+ processInstance
+ , processService
+ , nettyRemotingClient
+ , alertManager
+ , masterConfig));
}
+ } catch (Exception e) {
+ logger.error("scan command error ", e);
+ processService.moveToErrorCommand(command, e.toString());
}
- } catch (Exception e){
- logger.error("master scheduler thread error",e);
- } finally{
- zkMasterClient.releaseMutex(mutex);
+ } else {
+ //indicate that no command ,sleep for 1s
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
+ } finally {
+ zkMasterClient.releaseMutex(mutex);
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 049e30e..8c2321d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
@@ -72,7 +73,7 @@ public class TaskPriorityQueueConsumerTest {
@Autowired
- private TaskPriorityQueue taskPriorityQueue;
+ private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
@Autowired
private TaskPriorityQueueConsumer taskPriorityQueueConsumer;
@@ -142,9 +143,8 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
- Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
-
- taskPriorityQueue.put("2_1_2_1_default");
+ TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+ taskPriorityQueue.put(taskPriority);
TimeUnit.SECONDS.sleep(10);
@@ -180,7 +180,8 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
- taskPriorityQueue.put("2_1_2_1_default");
+ TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+ taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource();
dataSource.setId(1);
@@ -243,7 +244,8 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
- taskPriorityQueue.put("2_1_2_1_default");
+ TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+ taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource();
dataSource.setId(80);
@@ -310,7 +312,8 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
- taskPriorityQueue.put("2_1_2_1_default");
+ TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
+ taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource();
dataSource.setId(1);
@@ -402,7 +405,8 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
- taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+ TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
+ taskPriorityQueue.put(taskPriority);
TimeUnit.SECONDS.sleep(10);
@@ -455,7 +459,9 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
- boolean res = taskPriorityQueueConsumer.dispatch(1);
+ TaskPriority taskPriority = new TaskPriority();
+ taskPriority.setTaskId(1);
+ boolean res = taskPriorityQueueConsumer.dispatch(taskPriority);
Assert.assertFalse(res);
}
@@ -649,7 +655,8 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
- taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
+ TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
+ taskPriorityQueue.put(taskPriority);
taskPriorityQueueConsumer.run();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
similarity index 56%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
index 991eeed..a872f6d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.entity;
+package org.apache.dolphinscheduler.service.queue;
-import static org.apache.dolphinscheduler.common.Constants.*;
+import java.util.Map;
+import java.util.Objects;
/**
* task priority info
*/
-public class TaskPriority {
+public class TaskPriority implements Comparable<TaskPriority> {
/**
* processInstancePriority
@@ -50,9 +51,9 @@ public class TaskPriority {
private String groupName;
/**
- * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
+ * context
*/
- private String taskPriorityInfo;
+ private Map<String, String> context;
public TaskPriority(){}
@@ -65,15 +66,6 @@ public class TaskPriority {
this.taskInstancePriority = taskInstancePriority;
this.taskId = taskId;
this.groupName = groupName;
- this.taskPriorityInfo = this.processInstancePriority +
- UNDERLINE +
- this.processInstanceId +
- UNDERLINE +
- this.taskInstancePriority +
- UNDERLINE +
- this.taskId +
- UNDERLINE +
- this.groupName;
}
public int getProcessInstancePriority() {
@@ -104,6 +96,10 @@ public class TaskPriority {
return taskId;
}
+ public Map<String, String> getContext() {
+ return context;
+ }
+
public void setTaskId(int taskId) {
this.taskId = taskId;
}
@@ -116,32 +112,61 @@ public class TaskPriority {
this.groupName = groupName;
}
- public String getTaskPriorityInfo() {
- return taskPriorityInfo;
+ public void setContext(Map<String, String> context) {
+ this.context = context;
}
- public void setTaskPriorityInfo(String taskPriorityInfo) {
- this.taskPriorityInfo = taskPriorityInfo;
- }
+ @Override
+ public int compareTo(TaskPriority other) {
+ if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
+ return 1;
+ }
+ if (this.getProcessInstancePriority() < other.getProcessInstancePriority()) {
+ return -1;
+ }
- /**
- * taskPriorityInfo convert taskPriority
- *
- * @param taskPriorityInfo taskPriorityInfo
- * @return TaskPriority
- */
- public static TaskPriority of(String taskPriorityInfo){
- String[] parts = taskPriorityInfo.split(UNDERLINE);
+ if (this.getProcessInstanceId() > other.getProcessInstanceId()) {
+ return 1;
+ }
+ if (this.getProcessInstanceId() < other.getProcessInstanceId()) {
+ return -1;
+ }
+
+ if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) {
+ return 1;
+ }
+ if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
+ return -1;
+ }
+
+ if (this.getTaskId() > other.getTaskId()) {
+ return 1;
+ }
+ if (this.getTaskId() < other.getTaskId()) {
+ return -1;
+ }
- if (parts.length != 5) {
- throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo));
+ return this.getGroupName().compareTo(other.getGroupName());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
}
- TaskPriority taskPriority = new TaskPriority(
- Integer.parseInt(parts[0]),
- Integer.parseInt(parts[1]),
- Integer.parseInt(parts[2]),
- Integer.parseInt(parts[3]),
- parts[4]);
- return taskPriority;
+ TaskPriority that = (TaskPriority) o;
+ return processInstancePriority == that.processInstancePriority
+ && processInstanceId == that.processInstanceId
+ && taskInstancePriority == that.taskInstancePriority
+ && taskId == that.taskId
+ && Objects.equals(groupName, that.groupName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName);
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index aefad84..694d4c4 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -17,24 +17,18 @@
package org.apache.dolphinscheduler.service.queue;
-import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH;
-import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
-import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import org.springframework.stereotype.Service;
-
-
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implementation
*/
@Service
-public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
+public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
/**
* queue size
*/
@@ -43,7 +37,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
/**
* queue
*/
- private PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+ private PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE);
/**
* put task takePriorityInfo
@@ -52,7 +46,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
* @throws TaskPriorityQueueException
*/
@Override
- public void put(String taskPriorityInfo) throws TaskPriorityQueueException {
+ public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException {
queue.put(taskPriorityInfo);
}
@@ -63,7 +57,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
* @throws TaskPriorityQueueException
*/
@Override
- public String take() throws TaskPriorityQueueException, InterruptedException {
+ public TaskPriority take() throws TaskPriorityQueueException, InterruptedException {
return queue.take();
}
@@ -77,36 +71,4 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
public int size() throws TaskPriorityQueueException {
return queue.size();
}
-
- /**
- * TaskInfoComparator
- */
- private class TaskInfoComparator implements Comparator<String> {
-
- /**
- * compare o1 o2
- *
- * @param o1 o1
- * @param o2 o2
- * @return compare result
- */
- @Override
- public int compare(String o1, String o2) {
- String s1 = o1;
- String s2 = o2;
- String[] s1Array = s1.split(UNDERLINE);
- if (s1Array.length > TASK_INFO_LENGTH) {
- // warning: if this length > 5, need to be changed
- s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE));
- }
-
- String[] s2Array = s2.split(UNDERLINE);
- if (s2Array.length > TASK_INFO_LENGTH) {
- // warning: if this length > 5, need to be changed
- s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE));
- }
-
- return s1.compareTo(s2);
- }
- }
}
diff --git a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java
new file mode 100644
index 0000000..1511770
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package queue;
+
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskPriorityTest {
+
+ @Test
+ public void testSort() {
+ TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default");
+ TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default");
+ TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default");
+ List<TaskPriority> taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+ Collections.sort(taskPrioritys);
+ Assert.assertEquals(
+ Arrays.asList(priorityOne, priorityTwo, priorityThree),
+ taskPrioritys
+ );
+
+ priorityOne = new TaskPriority(0, 1, 0, 0, "default");
+ priorityTwo = new TaskPriority(0, 2, 0, 0, "default");
+ priorityThree = new TaskPriority(0, 3, 0, 0, "default");
+ taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+ Collections.sort(taskPrioritys);
+ Assert.assertEquals(
+ Arrays.asList(priorityOne, priorityTwo, priorityThree),
+ taskPrioritys
+ );
+
+ priorityOne = new TaskPriority(0, 0, 1, 0, "default");
+ priorityTwo = new TaskPriority(0, 0, 2, 0, "default");
+ priorityThree = new TaskPriority(0, 0, 3, 0, "default");
+ taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+ Collections.sort(taskPrioritys);
+ Assert.assertEquals(
+ Arrays.asList(priorityOne, priorityTwo, priorityThree),
+ taskPrioritys
+ );
+
+ priorityOne = new TaskPriority(0, 0, 0, 1, "default");
+ priorityTwo = new TaskPriority(0, 0, 0, 2, "default");
+ priorityThree = new TaskPriority(0, 0, 0, 3, "default");
+ taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+ Collections.sort(taskPrioritys);
+ Assert.assertEquals(
+ Arrays.asList(priorityOne, priorityTwo, priorityThree),
+ taskPrioritys
+ );
+
+ priorityOne = new TaskPriority(0, 0, 0, 0, "default_1");
+ priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2");
+ priorityThree = new TaskPriority(0, 0, 0, 0, "default_3");
+ taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
+ Collections.sort(taskPrioritys);
+ Assert.assertEquals(
+ Arrays.asList(priorityOne, priorityTwo, priorityThree),
+ taskPrioritys
+ );
+ }
+}
diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
index ca6c083..2c13afa 100644
--- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
@@ -17,6 +17,7 @@
package queue;
+import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.junit.Test;
@@ -31,19 +32,16 @@ public class TaskUpdateQueueTest {
@Test
public void testQueue() throws Exception{
- // ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
-
/**
* 1_1_2_1_default
* 1_1_2_2_default
* 1_1_0_3_default
* 1_1_0_4_default
*/
-
- String taskInfo1 = "1_1_2_1_default";
- String taskInfo2 = "1_1_2_2_default";
- String taskInfo3 = "1_1_0_3_default";
- String taskInfo4 = "1_1_0_4_default";
+ TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default");
+ TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default");
+ TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default");
+ TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default");
TaskPriorityQueue queue = new TaskPriorityQueueImpl();
queue.put(taskInfo1);
@@ -51,9 +49,9 @@ public class TaskUpdateQueueTest {
queue.put(taskInfo3);
queue.put(taskInfo4);
- assertEquals("1_1_0_3_default", queue.take());
- assertEquals("1_1_0_4_default", queue.take());
- assertEquals("1_1_2_1_default",queue.take());
- assertEquals("1_1_2_2_default",queue.take());
+ assertEquals(taskInfo3, queue.take());
+ assertEquals(taskInfo4, queue.take());
+ assertEquals(taskInfo1, queue.take());
+ assertEquals(taskInfo2, queue.take());
}
}