You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/02/10 11:07:57 UTC
[dolphinscheduler] branch dev updated: Optimize some code (#8324)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 66dcf23 Optimize some code (#8324)
66dcf23 is described below
commit 66dcf2376a18995142dd4b3f0cb54df385720e95
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Feb 10 19:07:53 2022 +0800
Optimize some code (#8324)
---
.../master/registry/MasterRegistryClient.java | 1 +
.../registry/MasterRegistryDataListener.java | 4 +-
.../server/master/registry/ServerNodeManager.java | 31 +++++++-------
.../master/runner/MasterSchedulerService.java | 38 +++++++---------
.../remote/NettyRemotingServer.java | 50 ++++------------------
.../service/process/ProcessService.java | 2 +-
.../service/queue/MasterPriorityQueue.java | 4 +-
7 files changed, 46 insertions(+), 84 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index d23ab36..77e27ec 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -122,6 +122,7 @@ public class MasterRegistryClient {
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
logger.error("master start up exception", e);
+ throw new RuntimeException("master start up error", e);
} finally {
registryClient.releaseLock(nodeLock);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 361f09f..30cea36 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -56,7 +56,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
}
}
- public void handleMasterEvent(Event event) {
+ private void handleMasterEvent(Event event) {
final String path = event.path();
switch (event.type()) {
case ADD:
@@ -70,7 +70,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
}
}
- public void handleWorkerEvent(Event event) {
+ private void handleWorkerEvent(Event event) {
final String path = event.path();
switch (event.type()) {
case ADD:
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index b7e904b..02f7f98 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -117,7 +116,7 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private WorkerGroupMapper workerGroupMapper;
- private MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
+ private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
/**
* alert dao
@@ -125,15 +124,16 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private AlertDao alertDao;
- public static volatile List<Integer> SLOT_LIST = new ArrayList<>();
+ private static volatile int MASTER_SLOT = 0;
- public static volatile Integer MASTER_SIZE = 0;
+ private static volatile int MASTER_SIZE = 0;
- public static Integer getSlot() {
- if (SLOT_LIST.size() > 0) {
- return SLOT_LIST.get(0);
- }
- return 0;
+ public static int getSlot() {
+ return MASTER_SLOT;
+ }
+
+ public static int getMasterSize() {
+ return MASTER_SIZE;
}
@@ -295,7 +295,7 @@ public class ServerNodeManager implements InitializingBean {
}
private void updateMasterNodes() {
- SLOT_LIST.clear();
+ MASTER_SLOT = 0;
this.masterNodes.clear();
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
try {
@@ -333,17 +333,18 @@ public class ServerNodeManager implements InitializingBean {
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
+ String host = NetUtils.getHost();
this.masterNodes.addAll(nodes);
this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes);
- int index = masterPriorityQueue.getIndex(NetUtils.getHost());
+ int index = masterPriorityQueue.getIndex(host);
if (index >= 0) {
MASTER_SIZE = nodes.size();
- SLOT_LIST.add(masterPriorityQueue.getIndex(NetUtils.getHost()));
+ MASTER_SLOT = index;
+ } else {
+ logger.warn("current host:{} is not in active master list", host);
}
- logger.info("update master nodes, master size: {}, slot: {}",
- MASTER_SIZE, SLOT_LIST.toString()
- );
+ logger.info("update master nodes, master size: {}, slot: {}", MASTER_SIZE, MASTER_SLOT);
} finally {
masterLock.unlock();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index ff30f28..f8a1ef6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -30,14 +30,12 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
-import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
@@ -185,29 +183,24 @@ public class MasterSchedulerService extends Thread {
return null;
}
- ProcessInstance[] processInstances = new ProcessInstance[commands.size()];
+ List<ProcessInstance> processInstances = new ArrayList<>(commands.size());
CountDownLatch latch = new CountDownLatch(commands.size());
- for (int i = 0; i < commands.size(); i++) {
- int index = i;
- this.masterPrepareExecService.execute(() -> {
- Command command = commands.get(index);
- // slot check again
- if (!slotCheck(command)) {
- latch.countDown();
- return;
- }
-
+ for (final Command command : commands) {
+ masterPrepareExecService.execute(() -> {
try {
+ // slot check again
+ if (!slotCheck(command)) {
+ return;
+ }
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
command);
if (processInstance != null) {
- processInstances[index] = processInstance;
- logger.info("handle command command {} end, create process instance {}",
- command.getId(), processInstance.getId());
+ processInstances.add(processInstance);
+ logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
}
} catch (Exception e) {
- logger.error("scan command error ", e);
+ logger.error("handle command error ", e);
processService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
@@ -222,7 +215,7 @@ public class MasterSchedulerService extends Thread {
logger.error("countDownLatch await error ", e);
}
- return Arrays.asList(processInstances);
+ return processInstances;
}
private List<Command> findCommands() {
@@ -230,9 +223,10 @@ public class MasterSchedulerService extends Thread {
int pageSize = masterConfig.getFetchCommandNum();
List<Command> result = new ArrayList<>();
while (Stopper.isRunning()) {
- if (ServerNodeManager.MASTER_SIZE == 0) {
+ if (ServerNodeManager.getMasterSize() == 0) {
return result;
}
+ // todo: Can we use the slot to scan database?
List<Command> commandList = processService.findCommandPage(pageSize, pageNumber);
if (commandList.size() == 0) {
return result;
@@ -253,10 +247,8 @@ public class MasterSchedulerService extends Thread {
private boolean slotCheck(Command command) {
int slot = ServerNodeManager.getSlot();
- if (ServerNodeManager.MASTER_SIZE != 0 && command.getId() % ServerNodeManager.MASTER_SIZE == slot) {
- return true;
- }
- return false;
+ int masterSize = ServerNodeManager.getMasterSize();
+ return masterSize != 0 && command.getId() % masterSize == slot;
}
private String getLocalAddress() {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index f4a8869..665779d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -32,11 +32,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@@ -60,11 +61,6 @@ public class NettyRemotingServer {
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
- * encoder
- */
- private final NettyEncoder encoder = new NettyEncoder();
-
- /**
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
@@ -106,42 +102,14 @@ public class NettyRemotingServer {
*/
public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
+ ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build();
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build();
if (NettyUtils.useEpoll()) {
- this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
- }
- });
-
- this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
- }
- });
+ this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
+ this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
} else {
- this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
- }
- });
-
- this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
- }
- });
+ this.bossGroup = new NioEventLoopGroup(1, bossThreadFactory);
+ this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
}
}
@@ -191,7 +159,7 @@ public class NettyRemotingServer {
*/
private void initNettyChannel(SocketChannel ch) {
ch.pipeline()
- .addLast("encoder", encoder)
+ .addLast("encoder", new NettyEncoder())
.addLast("decoder", new NettyDecoder())
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast("handler", serverHandler);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b795385..d6251d5 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -275,7 +275,7 @@ public class ProcessService {
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
- //if the processDefination is serial
+ //if the processDefinition is serial
ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance, processDefinition);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
index 7743203..66385db 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
@@ -97,12 +97,12 @@ public class MasterPriorityQueue implements TaskPriorityQueue<Server> {
}
/**
- * server comparator
+ * server comparator, used to sort server by createTime in reverse order.
*/
private class ServerComparator implements Comparator<Server> {
@Override
public int compare(Server o1, Server o2) {
- return o1.getCreateTime().before(o2.getCreateTime()) ? 1 : 0;
+ return o2.getCreateTime().compareTo(o1.getCreateTime());
}
}