You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/04 03:45:00 UTC
[incubator-dolphinscheduler] branch refactor-logger updated:
refactor Logger (#1887)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-logger
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-logger by this push:
new ccd3194 refactor Logger (#1887)
ccd3194 is described below
commit ccd3194af967b6942aaf38ec6ff6b2343f91a3cd
Author: Tboy <gu...@immomo.com>
AuthorDate: Tue Feb 4 11:44:51 2020 +0800
refactor Logger (#1887)
* move version to parent pom
* move version properties to parent pom for easy management
* remove freemarker dependency
* add remote module
* add license and remove author
* rewrite LoggerServer and LoggerClient, remove rpc module
* remove unused class
* updates
* updates for log part
* updates
* remove author
* updates
* add licence and remove author
* add service module
* add log class
* add ping/pong and test case for client
---
dolphinscheduler-api/pom.xml | 2 +-
.../dolphinscheduler/api/ApiApplicationServer.java | 1 +
.../api/CombinedApplicationServer.java | 49 -----
.../apache/dolphinscheduler/api/log/LogClient.java | 137 ------------
.../api/service/LoggerService.java | 6 +-
.../api/service/ResourcesService.java | 10 +-
.../api/service/UdfFuncService.java | 2 +-
dolphinscheduler-remote/pom.xml | 34 +++
.../remote/NettyRemotingClient.java | 190 ++++++++++++++++
.../remote/NettyRemotingServer.java | 162 ++++++++++++++
.../remote/codec/NettyDecoder.java | 89 ++++++++
.../remote/codec/NettyEncoder.java | 34 +--
.../dolphinscheduler/remote/command/Command.java | 89 ++++++++
.../remote/command/CommandHeader.java | 44 ++--
.../remote/command/CommandType.java | 1 +
.../remote/command/ExecuteTaskRequestCommand.java | 1 +
.../remote/command/ExecuteTaskResponseCommand.java | 1 +
.../dolphinscheduler/remote/command/Ping.java | 57 +++++
.../dolphinscheduler/remote/command/Pong.java | 54 +++++
.../remote/command/log/GetLogRequestCommand.java | 55 +++++
.../remote/command/log/GetLogResponseCommand.java | 53 +++++
.../command/log/RollViewLogRequestCommand.java | 77 +++++++
.../command/log/RollViewLogResponseCommand.java | 52 +++++
.../remote/command/log/ViewLogRequestCommand.java | 55 +++++
.../remote/command/log/ViewLogResponseCommand.java | 52 +++++
.../remote/config/NettyClientConfig.java | 73 +++++++
.../remote/config/NettyServerConfig.java | 92 ++++++++
.../remote/exceptions/RemotingException.java | 91 ++++++++
.../remote/handler/NettyClientHandler.java | 120 +++++++++++
.../remote/handler/NettyServerHandler.java | 118 ++++++++++
.../remote/processor/NettyRequestProcessor.java | 21 +-
.../dolphinscheduler/remote/utils/Address.java | 84 ++++++++
.../remote/utils/ChannelUtils.java | 30 +--
.../dolphinscheduler/remote/utils/Constants.java | 23 +-
.../remote/utils/FastJsonSerializer.java | 28 +--
.../apache/dolphinscheduler/remote/utils/Pair.java | 39 ++--
.../src/test/java/NettyRemotingClientTest.java | 71 ++++++
dolphinscheduler-rpc/pom.xml | 113 ----------
.../src/main/proto/scheduler.proto | 101 ---------
dolphinscheduler-server/pom.xml | 8 +-
.../server/log/LoggerRequestProcessor.java | 168 +++++++++++++++
.../dolphinscheduler/server/log/LoggerServer.java | 79 +++++++
.../dolphinscheduler/server/rpc/LogClient.java | 149 -------------
.../dolphinscheduler/server/rpc/LoggerServer.java | 238 ---------------------
.../server/utils/ProcessUtils.java | 4 +-
dolphinscheduler-service/pom.xml | 20 ++
.../service/log/LogClientService.java | 153 +++++++++++++
.../dolphinscheduler/service/log/LogPromise.java | 74 +++++++
pom.xml | 72 ++++---
49 files changed, 2340 insertions(+), 936 deletions(-)
diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml
index ae28a48..e185cb2 100644
--- a/dolphinscheduler-api/pom.xml
+++ b/dolphinscheduler-api/pom.xml
@@ -153,7 +153,7 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-rpc</artifactId>
+ <artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 8376c28..a0947f7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -29,6 +29,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
public class ApiApplicationServer extends SpringBootServletInitializer {
public static void main(String[] args) {
+
SpringApplication.run(ApiApplicationServer.class, args);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java
deleted file mode 100644
index 5030890..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api;
-
-import org.apache.dolphinscheduler.alert.AlertServer;
-import org.apache.dolphinscheduler.server.master.MasterServer;
-import org.apache.dolphinscheduler.server.rpc.LoggerServer;
-import org.apache.dolphinscheduler.server.worker.WorkerServer;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Import;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
-
-@SpringBootApplication
-@ConditionalOnProperty(prefix = "server", name = "is-combined-server", havingValue = "true")
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-@Import({MasterServer.class, WorkerServer.class})
-@EnableSwagger2
-public class CombinedApplicationServer extends SpringBootServletInitializer {
-
- public static void main(String[] args) throws Exception {
-
- ApiApplicationServer.main(args);
-
- LoggerServer server = new LoggerServer();
- server.start();
-
- AlertServer alertServer = AlertServer.getInstance();
- alertServer.start();
- }
-}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java
deleted file mode 100644
index 3452060..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.log;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * log client
- */
-public class LogClient {
-
- private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
-
- private final ManagedChannel channel;
- private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
-
- /**
- * construct client connecting to HelloWorld server at {@code host:port}
- *
- * @param host host
- * @param port port
- */
- public LogClient(String host, int port) {
- this(ManagedChannelBuilder.forAddress(host, port)
- // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
- // needing certificates.
- .usePlaintext(true));
- }
-
- /**
- * construct client for accessing RouteGuide server using the existing channel
- *
- */
- LogClient(ManagedChannelBuilder<?> channelBuilder) {
- /**
- * set max read size
- */
- channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
- channel = channelBuilder.build();
- blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
- }
-
- /**
- * shutdown
- *
- * @throws InterruptedException InterruptedException
- */
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /**
- * roll view log
- *
- * @param path path
- * @param skipLineNum skip line number
- * @param limit limit
- * @return log content
- */
- public String rollViewLog(String path,int skipLineNum,int limit) {
- logger.info("roll view log : path {},skipLineNum {} ,limit {}", path, skipLineNum, limit);
- LogParameter pathParameter = LogParameter
- .newBuilder()
- .setPath(path)
- .setSkipLineNum(skipLineNum)
- .setLimit(limit)
- .build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.rollViewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("roll view log error", e);
- return null;
- }
- }
-
- /**
- * view log
- *
- * @param path path
- * @return log content
- */
- public String viewLog(String path) {
- logger.info("view log path {}",path);
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.viewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("view log error", e);
- return null;
- }
- }
-
- /**
- * get log size
- *
- * @param path log path
- * @return log content bytes
- */
- public byte[] getLogBytes(String path) {
- logger.info("log path {}",path);
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetByteInfo retByteInfo;
- try {
- retByteInfo = blockingStub.getLogBytes(pathParameter);
- return retByteInfo.getData().toByteArray();
- } catch (StatusRuntimeException e) {
- logger.error("log size error", e);
- return null;
- }
- }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index 61dc1a7..108d5d4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.log.LogClient;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -65,7 +65,7 @@ public class LoggerService {
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
- LogClient logClient = new LogClient(host, Constants.RPC_PORT);
+ LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
@@ -85,7 +85,7 @@ public class LoggerService {
throw new RuntimeException("task instance is null");
}
String host = taskInstance.getHost();
- LogClient logClient = new LogClient(host, Constants.RPC_PORT);
+ LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
return logClient.getLogBytes(taskInstance.getLogPath());
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index 3093dae..09b1d31 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.collections.BeanMap;
-import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -28,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@@ -104,7 +104,7 @@ public class ResourcesService extends BaseService {
String nameSuffix = FileUtils.suffix(name);
// determine file suffix
- if (!StringUtils.equals(fileSuffix, nameSuffix)) {
+ if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
/**
* rename file suffix and original suffix must be consistent
*/
@@ -341,7 +341,7 @@ public class ResourcesService extends BaseService {
String nameSuffix = FileUtils.suffix(name);
// determine file suffix
- if (!StringUtils.equals(fileSuffix, nameSuffix)) {
+ if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
return false;
}
// query tenant
@@ -539,7 +539,7 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.SUCCESS);
Map<String, Object> map = new HashMap<>();
map.put(ALIAS, resource.getAlias());
- map.put(CONTENT, StringUtils.join(content.toArray(), "\n"));
+ map.put(CONTENT, StringUtils.join(content, "\n"));
result.setData(map);
}else{
logger.error("read file {} not exist in hdfs", hdfsFileName);
@@ -602,7 +602,7 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
- Map<String, Object> resultMap = new HashMap<>(5);
+ Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
index 2032492..249c7ec 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml
new file mode 100644
index 0000000..858f40a
--- /dev/null
+++ b/dolphinscheduler-remote/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dolphinscheduler</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <version>1.2.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dolphinscheduler-remote</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
new file mode 100644
index 0000000..dda78f3
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
+import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NettyRemotingClient {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
+
+ private final Bootstrap bootstrap = new Bootstrap();
+
+ private final NettyEncoder encoder = new NettyEncoder();
+
+ private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
+
+ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private final NioEventLoopGroup workerGroup;
+
+ private final NettyClientHandler clientHandler = new NettyClientHandler(this);
+
+ private final NettyClientConfig clientConfig;
+
+ public NettyRemotingClient(final NettyClientConfig clientConfig){
+ this.clientConfig = clientConfig;
+ this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+ this.start();
+ }
+
+ private void start(){
+
+ this.bootstrap
+ .group(this.workerGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
+ .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
+ .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
+ .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
+ .handler(new ChannelInitializer<SocketChannel>() {
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ new NettyDecoder(),
+ clientHandler,
+ encoder);
+ }
+ });
+ //
+ isStarted.compareAndSet(false, true);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ registerProcessor(commandType, processor, null);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ this.clientHandler.registerProcessor(commandType, processor, executor);
+ }
+
+ public void send(final Address address, final Command command) throws RemotingException {
+ final Channel channel = getChannel(address);
+ if (channel == null) {
+ throw new RemotingException("network error");
+ }
+ try {
+ channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()){
+ logger.info("sent command {} to {}", command, address);
+ } else{
+ logger.error("send command {} to {} failed, error {}", command, address, future.cause());
+ }
+ }
+ });
+ } catch (Exception ex) {
+ String msg = String.format("send command %s to address %s encounter error", command, address);
+ throw new RemotingException(msg, ex);
+ }
+ }
+
+ public Channel getChannel(Address address) {
+ Channel channel = channels.get(address);
+ if(channel != null && channel.isActive()){
+ return channel;
+ }
+ return createChannel(address, true);
+ }
+
+ public Channel createChannel(Address address, boolean isSync) {
+ ChannelFuture future;
+ try {
+ synchronized (bootstrap){
+ future = bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort()));
+ }
+ if(isSync){
+ future.sync();
+ }
+ if (future.isSuccess()) {
+ Channel channel = future.channel();
+ channels.put(address, channel);
+ return channel;
+ }
+ } catch (Exception ex) {
+ logger.info("connect to {} error {}", address, ex);
+ }
+ return null;
+ }
+
+ public ExecutorService getDefaultExecutor() {
+ return defaultExecutor;
+ }
+
+ public void close() {
+ if(isStarted.compareAndSet(true, false)){
+ try {
+ closeChannels();
+ if(workerGroup != null){
+ this.workerGroup.shutdownGracefully();
+ }
+ if(defaultExecutor != null){
+ defaultExecutor.shutdown();
+ }
+ } catch (Exception ex) {
+ logger.error("netty client close exception", ex);
+ }
+ logger.info("netty client closed");
+ }
+ }
+
+ private void closeChannels(){
+ for (Channel channel : this.channels.values()) {
+ channel.close();
+ }
+ this.channels.clear();
+ }
+
+ public void removeChannel(Address address){
+ Channel channel = this.channels.remove(address);
+ if(channel != null){
+ channel.close();
+ }
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
new file mode 100644
index 0000000..7fd7331
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
+import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class NettyRemotingServer {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
+
+ private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+
+ private final NettyEncoder encoder = new NettyEncoder();
+
+ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+
+ private final NioEventLoopGroup bossGroup;
+
+ private final NioEventLoopGroup workGroup;
+
+ private final NettyServerConfig serverConfig;
+
+ private final NettyServerHandler serverHandler = new NettyServerHandler(this);
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public NettyRemotingServer(final NettyServerConfig serverConfig){
+ this.serverConfig = serverConfig;
+
+ this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+
+ this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+ }
+
+ public void start(){
+
+ if(this.isStarted.get()){
+ return;
+ }
+
+ this.serverBootstrap
+ .group(this.bossGroup, this.workGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
+ .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
+ .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
+ .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
+ .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
+ .childHandler(new ChannelInitializer<NioSocketChannel>() {
+
+ protected void initChannel(NioSocketChannel ch) throws Exception {
+ initNettyChannel(ch);
+ }
+ });
+
+ ChannelFuture future;
+ try {
+ future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
+ } catch (Exception e) {
+ logger.error("NettyRemotingServer bind fail {}, exit", e);
+ throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
+ }
+ if (future.isSuccess()) {
+ logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
+ } else if (future.cause() != null) {
+ throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
+ } else {
+ throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
+ }
+ //
+ isStarted.compareAndSet(false, true);
+ }
+
+ private void initNettyChannel(NioSocketChannel ch) throws Exception{
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("encoder", encoder);
+ pipeline.addLast("decoder", new NettyDecoder());
+ pipeline.addLast("handler", serverHandler);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, null);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ this.serverHandler.registerProcessor(commandType, processor, executor);
+ }
+
+ public ExecutorService getDefaultExecutor() {
+ return defaultExecutor;
+ }
+
+ public void close() {
+ if(isStarted.compareAndSet(true, false)){
+ try {
+ if(bossGroup != null){
+ this.bossGroup.shutdownGracefully();
+ }
+ if(workGroup != null){
+ this.workGroup.shutdownGracefully();
+ }
+ if(defaultExecutor != null){
+ defaultExecutor.shutdown();
+ }
+ } catch (Exception ex) {
+ logger.error("netty server close exception", ex);
+ }
+ logger.info("netty server closed");
+ }
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
new file mode 100644
index 0000000..dc37334
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.codec;
+
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandHeader;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.util.List;
+
+public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
+
+ public NettyDecoder(){
+ super(State.MAGIC);
+ }
+
+ private final CommandHeader commandHeader = new CommandHeader();
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ switch (state()){
+ case MAGIC:
+ checkMagic(in.readByte());
+ checkpoint(State.COMMAND);
+ case COMMAND:
+ commandHeader.setType(in.readByte());
+ checkpoint(State.OPAQUE);
+ case OPAQUE:
+ commandHeader.setOpaque(in.readLong());
+ checkpoint(State.BODY_LENGTH);
+ case BODY_LENGTH:
+ commandHeader.setBodyLength(in.readInt());
+ checkpoint(State.BODY);
+ case BODY:
+ byte[] body = new byte[commandHeader.getBodyLength()];
+ in.readBytes(body);
+ //
+ Command packet = new Command();
+ packet.setType(commandType(commandHeader.getType()));
+ packet.setOpaque(commandHeader.getOpaque());
+ packet.setBody(body);
+ out.add(packet);
+ //
+ checkpoint(State.MAGIC);
+ }
+ }
+
+ private CommandType commandType(byte type){
+ for(CommandType ct : CommandType.values()){
+ if(ct.ordinal() == type){
+ return ct;
+ }
+ }
+ return null;
+ }
+
+ private void checkMagic(byte magic) {
+ if (magic != Command.MAGIC) {
+ throw new IllegalArgumentException("illegal packet [magic]" + magic);
+ }
+ }
+
+ enum State{
+ MAGIC,
+ COMMAND,
+ OPAQUE,
+ BODY_LENGTH,
+ BODY;
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
similarity index 51%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
index 8376c28..fb5b36a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
@@ -14,23 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.codec;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.dolphinscheduler.remote.command.Command;
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
-
- public static void main(String[] args) {
- SpringApplication.run(ApiApplicationServer.class, args);
- }
+@Sharable
+public class NettyEncoder extends MessageToByteEncoder<Command> {
+ protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
+ if(msg == null){
+ throw new Exception("encode msg is null");
+ }
+ out.writeByte(Command.MAGIC);
+ out.writeByte(msg.getType().ordinal());
+ out.writeLong(msg.getOpaque());
+ out.writeInt(msg.getBody().length);
+ out.writeBytes(msg.getBody());
+ }
}
+
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
new file mode 100644
index 0000000..3f0a394
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+
+public class Command implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final byte MAGIC = (byte) 0xbabe;
+
+ public Command(){
+ }
+
+ public Command(long opaque){
+ this.opaque = opaque;
+ }
+
+ private CommandType type;
+
+ private long opaque;
+
+ private byte[] body;
+
+ public CommandType getType() {
+ return type;
+ }
+
+ public void setType(CommandType type) {
+ this.type = type;
+ }
+
+ public long getOpaque() {
+ return opaque;
+ }
+
+ public void setOpaque(long opaque) {
+ this.opaque = opaque;
+ }
+
+ public byte[] getBody() {
+ return body;
+ }
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (opaque ^ (opaque >>> 32));
+ return result;
+ }
+
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Command other = (Command) obj;
+ if (opaque != other.opaque)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "Command [type=" + type + ", opaque=" + opaque + ", bodyLen=" + (body == null ? 0 : body.length) + "]";
+ }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
index 8376c28..ac51d01 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -14,23 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.command;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import java.io.Serializable;
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+public class CommandHeader implements Serializable {
- public static void main(String[] args) {
- SpringApplication.run(ApiApplicationServer.class, args);
- }
+ private byte type;
+ private long opaque;
+ private int bodyLength;
+
+ public int getBodyLength() {
+ return bodyLength;
+ }
+
+ public void setBodyLength(int bodyLength) {
+ this.bodyLength = bodyLength;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public void setType(byte type) {
+ this.type = type;
+ }
+
+ public long getOpaque() {
+ return opaque;
+ }
+
+ public void setOpaque(long opaque) {
+ this.opaque = opaque;
+ }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
new file mode 100644
index 0000000..468a5cc
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
ROLL_VIEW_LOG_REQ,
ROLL_VIEW_LOG_RES,
VIEW_LOG_REQ,
VIEW_LOG_RES,
GET_LOG_REQ,
GET_LOG
_RES,
EXECUTE_TASK_REQUEST,
EXECUTE_TASK_RESPONSE,
PING,
PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
new file mode 100644
index 0000000..e75c2de
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concu
rrent.atomic.AtomicLong;
public class ExecuteTaskRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private String applicationName;
private String groupName;
private String taskName;
private int connectorPort;
private String description;
private String className;
private String methodName;
private String params;
private List<Integer> shardItems;
public List<Integer> getShardItems() {
return shardItems;
}
public void setShardItems(List<Integer> shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getAp
plicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
new file mode 100644
index 0000000..fafb575
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong
;
public class ExecuteTaskResponseCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private Object result;
private long receivedTime;
private int executeCount;
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount
;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
new file mode 100644
index 0000000..365d451
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Ping implements Serializable {
+
+ private static final AtomicLong ID = new AtomicLong(1);
+
+ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+ private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+ private static final ByteBuf PING_BUF;
+
+ static {
+ ByteBuf ping = Unpooled.buffer();
+ ping.writeByte(Command.MAGIC);
+ ping.writeByte(CommandType.PING.ordinal());
+ ping.writeLong(0);
+ ping.writeInt(0);
+ ping.writeBytes(EMPTY_BODY);
+ PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+ }
+
+ public static ByteBuf pingContent(){
+ return PING_BUF.duplicate();
+ }
+
+ public static Command create(){
+ Command command = new Command(ID.getAndIncrement());
+ command.setType(CommandType.PING);
+ command.setBody(EMPTY_BODY_ARRAY);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
new file mode 100644
index 0000000..bc5abda
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+
+
+public class Pong implements Serializable {
+
+ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+ private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+ private static final ByteBuf PONG_BUF;
+
+ static {
+ ByteBuf ping = Unpooled.buffer();
+ ping.writeByte(Command.MAGIC);
+ ping.writeByte(CommandType.PONG.ordinal());
+ ping.writeLong(0);
+ ping.writeInt(0);
+ ping.writeBytes(EMPTY_BODY);
+ PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+ }
+
+ public static ByteBuf pingContent(){
+ return PONG_BUF.duplicate();
+ }
+
+ public static Command create(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.PONG);
+ command.setBody(EMPTY_BODY_ARRAY);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java
new file mode 100644
index 0000000..72c5fb8
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GetLogRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public GetLogRequestCommand() {
+ }
+
+ public GetLogRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.VIEW_LOG_REQ);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java
new file mode 100644
index 0000000..39cb11f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+public class GetLogResponseCommand implements Serializable {
+
+ private byte[] data;
+
+ public GetLogResponseCommand() {
+ }
+
+ public GetLogResponseCommand(byte[] data) {
+ this.data = data;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.GET_LOG_REQ);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
new file mode 100644
index 0000000..f655a8c
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class RollViewLogRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ private int skipLineNum;
+
+ private int limit;
+
+ public RollViewLogRequestCommand() {
+ }
+
+ public RollViewLogRequestCommand(String path, int skipLineNum, int limit) {
+ this.path = path;
+ this.skipLineNum = skipLineNum;
+ this.limit = limit;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public int getSkipLineNum() {
+ return skipLineNum;
+ }
+
+ public void setSkipLineNum(int skipLineNum) {
+ this.skipLineNum = skipLineNum;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.ROLL_VIEW_LOG_REQ);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
new file mode 100644
index 0000000..95d476f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+public class RollViewLogResponseCommand implements Serializable {
+
+ private String msg;
+
+ public RollViewLogResponseCommand() {
+ }
+
+ public RollViewLogResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.ROLL_VIEW_LOG_REQ);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
new file mode 100644
index 0000000..f9e01fe
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ViewLogRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public ViewLogRequestCommand() {
+ }
+
+ public ViewLogRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.VIEW_LOG_REQ);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
new file mode 100644
index 0000000..d5a59c8
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+public class ViewLogResponseCommand implements Serializable {
+
+ private String msg;
+
+ public ViewLogResponseCommand() {
+ }
+
+ public ViewLogResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.VIEW_LOG_RES);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
new file mode 100644
index 0000000..6b1ea5b
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+public class NettyClientConfig {
+
+ private int workerThreads = Constants.CPUS;
+
+ private boolean tcpNoDelay = true;
+
+ private boolean soKeepalive = true;
+
+ private int sendBufferSize = 65535;
+
+ private int receiveBufferSize = 65535;
+
+ public int getWorkerThreads() {
+ return workerThreads;
+ }
+
+ public void setWorkerThreads(int workerThreads) {
+ this.workerThreads = workerThreads;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public boolean isSoKeepalive() {
+ return soKeepalive;
+ }
+
+ public void setSoKeepalive(boolean soKeepalive) {
+ this.soKeepalive = soKeepalive;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
new file mode 100644
index 0000000..9afaeb3
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+public class NettyServerConfig {
+
+ private int soBacklog = 1024;
+
+ private boolean tcpNoDelay = true;
+
+ private boolean soKeepalive = true;
+
+ private int sendBufferSize = 65535;
+
+ private int receiveBufferSize = 65535;
+
+ private int workerThread = Constants.CPUS;
+
+ private int listenPort = 12346;
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
+ public int getSoBacklog() {
+ return soBacklog;
+ }
+
+ public void setSoBacklog(int soBacklog) {
+ this.soBacklog = soBacklog;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public boolean isSoKeepalive() {
+ return soKeepalive;
+ }
+
+ public void setSoKeepalive(boolean soKeepalive) {
+ this.soKeepalive = soKeepalive;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getWorkerThread() {
+ return workerThread;
+ }
+
+ public void setWorkerThread(int workerThread) {
+ this.workerThread = workerThread;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
new file mode 100644
index 0000000..62ab907
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.exceptions;
+
+public class RemotingException extends Exception {
+
+ public RemotingException() {
+ super();
+ }
+
+ /** Constructs a new runtime exception with the specified detail message.
+ * The cause is not initialized, and may subsequently be initialized by a
+ * call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public RemotingException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new runtime exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this runtime exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public RemotingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /** Constructs a new runtime exception with the specified cause and a
+ * detail message of <tt>(cause==null ? null : cause.toString())</tt>
+ * (which typically contains the class and detail message of
+ * <tt>cause</tt>). This constructor is useful for runtime exceptions
+ * that are little more than wrappers for other throwables.
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public RemotingException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new runtime exception with the specified detail
+ * message, cause, suppression enabled or disabled, and writable
+ * stack trace enabled or disabled.
+ *
+ * @param message the detail message.
+ * @param cause the cause. (A {@code null} value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ * @param enableSuppression whether or not suppression is enabled
+ * or disabled
+ * @param writableStackTrace whether or not the stack trace should
+ * be writable
+ *
+ * @since 1.7
+ */
+ protected RemotingException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
new file mode 100644
index 0000000..093614f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+
+@ChannelHandler.Sharable
+public class NettyClientHandler extends ChannelInboundHandlerAdapter {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
+
+ private final NettyRemotingClient nettyRemotingClient;
+
+ private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
+
+ public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
+ this.nettyRemotingClient = nettyRemotingClient;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ processReceived(ctx.channel(), (Command)msg);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ ExecutorService executorRef = executor;
+ if(executorRef == null){
+ executorRef = nettyRemotingClient.getDefaultExecutor();
+ }
+ this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+ }
+
+ private void processReceived(final Channel channel, final Command msg) {
+ final CommandType commandType = msg.getType();
+ final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
+ if (pair != null) {
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ pair.getLeft().process(channel, msg);
+ } catch (Throwable ex) {
+ logger.error("process msg {} error : {}", msg, ex);
+ }
+ }
+ };
+ try {
+ pair.getRight().submit(r);
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+ }
+ } else {
+ logger.warn("commandType {} not support", commandType);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exceptionCaught : {}", cause);
+ nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ Channel ch = ctx.channel();
+ ChannelConfig config = ch.config();
+
+ if (!ch.isWritable()) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is not writable, over high water level : {}",
+ new Object[]{ch, config.getWriteBufferHighWaterMark()});
+ }
+
+ config.setAutoRead(false);
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is writable, to low water : {}",
+ new Object[]{ch, config.getWriteBufferLowWaterMark()});
+ }
+ config.setAutoRead(true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
new file mode 100644
index 0000000..433f8b0
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+@ChannelHandler.Sharable
+public class NettyServerHandler extends ChannelInboundHandlerAdapter {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+
+ private final NettyRemotingServer nettyRemotingServer;
+
+ private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
+
+ public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
+ this.nettyRemotingServer = nettyRemotingServer;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ processReceived(ctx.channel(), (Command)msg);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, null);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ ExecutorService executorRef = executor;
+ if(executorRef == null){
+ executorRef = nettyRemotingServer.getDefaultExecutor();
+ }
+ this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+ }
+
+ private void processReceived(final Channel channel, final Command msg) {
+ final CommandType commandType = msg.getType();
+ final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
+ if (pair != null) {
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ pair.getLeft().process(channel, msg);
+ } catch (Throwable ex) {
+ logger.error("process msg {} error : {}", msg, ex);
+ }
+ }
+ };
+ try {
+ pair.getRight().submit(r);
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+ }
+ } else {
+ logger.warn("commandType {} not support", commandType);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exceptionCaught : {}", cause);
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ Channel ch = ctx.channel();
+ ChannelConfig config = ch.config();
+
+ if (!ch.isWritable()) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is not writable, over high water level : {}",
+ new Object[]{ch, config.getWriteBufferHighWaterMark()});
+ }
+
+ config.setAutoRead(false);
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is writable, to low water : {}",
+ new Object[]{ch, config.getWriteBufferLowWaterMark()});
+ }
+ config.setAutoRead(true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
index 8376c28..7b19b9c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
@@ -14,23 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.processor;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
-
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
-
- public static void main(String[] args) {
- SpringApplication.run(ApiApplicationServer.class, args);
- }
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+public interface NettyRequestProcessor {
+ void process(final Channel channel, final Command command);
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
new file mode 100644
index 0000000..4d311be
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import java.io.Serializable;
+
+public class Address implements Serializable {
+
+ private String host;
+
+ private int port;
+
+ public Address(){
+ //NOP
+ }
+
+ public Address(String host, int port){
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Address other = (Address) obj;
+ if (host == null) {
+ if (other.host != null)
+ return false;
+ } else if (!host.equals(other.host))
+ return false;
+ if (port != other.port)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "Address [host=" + host + ", port=" + port + "]";
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index 8376c28..aca2241 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -14,23 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.utils;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import io.netty.channel.Channel;
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+import java.net.InetSocketAddress;
- public static void main(String[] args) {
- SpringApplication.run(ApiApplicationServer.class, args);
- }
+public class ChannelUtils {
+ public static String getLocalAddress(Channel channel){
+ return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress();
+ }
+
+ public static String getRemoteAddress(Channel channel){
+ return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress();
+ }
+
+ public static Address toAddress(Channel channel){
+ InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
+ return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
+ }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 8376c28..c0a930c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -14,23 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.utils;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import java.nio.charset.Charset;
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+public class Constants {
- public static void main(String[] args) {
- SpringApplication.run(ApiApplicationServer.class, args);
- }
+ public static final String COMMA = ",";
+ public static final String SLASH = "/";
+
+ public static final Charset UTF8 = Charset.forName("UTF-8");
+
+ public static final int CPUS = Runtime.getRuntime().availableProcessors();
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
index 8376c28..32569ed 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
@@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.utils;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import com.alibaba.fastjson.JSON;
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+public class FastJsonSerializer {
- public static void main(String[] args) {
- SpringApplication.run(ApiApplicationServer.class, args);
- }
+ public static <T> byte[] serialize(T obj) {
+ String json = JSON.toJSONString(obj);
+ return json.getBytes(Constants.UTF8);
+ }
+ public static <T> String serializeToString(T obj) {
+ return JSON.toJSONString(obj);
+ }
+
+ public static <T> T deserialize(byte[] src, Class<T> clazz) {
+ return JSON.parseObject(new String(src, Constants.UTF8), clazz);
+ }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
index 8376c28..a79a374 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
@@ -14,23 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+package org.apache.dolphinscheduler.remote.utils;
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
- public static void main(String[] args) {
- SpringApplication.run(ApiApplicationServer.class, args);
- }
+public class Pair<L, R> {
+ private L left;
+ private R right;
+
+ public Pair(L left, R right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public L getLeft() {
+ return left;
+ }
+
+ public void setLeft(L left) {
+ this.left = left;
+ }
+
+ public R getRight() {
+ return right;
+ }
+
+ public void setRight(R right) {
+ this.right = right;
+ }
}
diff --git a/dolphinscheduler-remote/src/test/java/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/NettyRemotingClientTest.java
new file mode 100644
index 0000000..0c4601b
--- /dev/null
+++ b/dolphinscheduler-remote/src/test/java/NettyRemotingClientTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.Ping;
+import org.apache.dolphinscheduler.remote.command.Pong;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class NettyRemotingClientTest {
+
+
+ @Test
+ public void testSend(){
+ NettyServerConfig serverConfig = new NettyServerConfig();
+
+ NettyRemotingServer server = new NettyRemotingServer(serverConfig);
+ server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
+ @Override
+ public void process(Channel channel, Command command) {
+ channel.writeAndFlush(Pong.create(command.getOpaque()));
+ }
+ });
+ server.start();
+ //
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicLong opaque = new AtomicLong(1);
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient client = new NettyRemotingClient(clientConfig);
+ client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
+ @Override
+ public void process(Channel channel, Command command) {
+ opaque.set(command.getOpaque());
+ latch.countDown();
+ }
+ });
+ Command commandPing = Ping.create();
+ try {
+ client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
+ latch.await();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertEquals(opaque.get(), commandPing.getOpaque());
+ }
+}
diff --git a/dolphinscheduler-rpc/pom.xml b/dolphinscheduler-rpc/pom.xml
deleted file mode 100644
index 680a4a2..0000000
--- a/dolphinscheduler-rpc/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler</artifactId>
- <version>1.2.1-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>dolphinscheduler-rpc</artifactId>
-
- <name>dolphinscheduler-rpc</name>
- <url>https://github.com/apache/incubator-dolphinscheduler</url>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
-
- <protobuf.version>3.5.1</protobuf.version>
- <grpc.version>1.9.0</grpc.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-netty</artifactId>
- <version>${grpc.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf</artifactId>
- <version>${grpc.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-stub</artifactId>
- <version>${grpc.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <extensions>
- <extension>
- <groupId>kr.motd.maven</groupId>
- <artifactId>os-maven-plugin</artifactId>
- <version>1.5.0.Final</version>
- </extension>
- </extensions>
- <plugins>
- <plugin>
- <groupId>org.xolstice.maven.plugins</groupId>
- <artifactId>protobuf-maven-plugin</artifactId>
- <version>0.5.0</version>
- <configuration>
- <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
- <pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
- </configuration>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>compile-custom</id>
- <goals>
- <goal>compile-custom</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/dolphinscheduler-rpc/src/main/proto/scheduler.proto b/dolphinscheduler-rpc/src/main/proto/scheduler.proto
deleted file mode 100644
index b8b595c..0000000
--- a/dolphinscheduler-rpc/src/main/proto/scheduler.proto
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-syntax = "proto3";
-
-package schduler;
-
-option java_multiple_files = true;
-option java_package = "org.apache.dolphinscheduler.rpc";
-option java_outer_classname = "SchdulerProto";
-
-
-/**
- * return str info
- */
-message RetStrInfo {
- /**
- * str msg info
- */
- string msg = 1 ;
-}
-
-/**
- * return byte info
- */
-message RetByteInfo {
- /**
- * byte data info
- */
- bytes data = 1;
-}
-
-/**
- * log parameter
- */
-message LogParameter {
-
- /**
- * path
- */
- string path = 1 ;
-
- /**
- * skip line num
- */
- int32 skipLineNum = 2 ;
-
- /**
- * display limt num
- */
- int32 limit = 3 ;
-}
-
-
-/**
- * path parameter
- */
-message PathParameter {
-
- /**
- * path
- */
- string path = 1 ;
-}
-
-/**
- * log view service
- */
-service LogViewService {
-
- /**
- * roll view log
- */
- rpc rollViewLog(LogParameter) returns (RetStrInfo) {};
-
- /**
- * view all log
- */
- rpc viewLog(PathParameter) returns (RetStrInfo) {};
-
- /**
- * get log bytes
- */
- rpc getLogBytes(PathParameter) returns (RetByteInfo) {};
-}
-
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 2ccc880..808f5d1 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -71,7 +71,7 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-rpc</artifactId>
+ <artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
@@ -110,8 +110,12 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-service</artifactId>
+ </dependency>
- </dependencies>
+ </dependencies>
<build>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
new file mode 100644
index 0000000..c30875b
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+
+public class LoggerRequestProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);
+
+ private final ThreadPoolExecutor executor;
+
+ public LoggerRequestProcessor(){
+ this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received command : {}", command);
+ final CommandType commandType = command.getType();
+ switch (commandType){
+ case GET_LOG_REQ:
+ GetLogRequestCommand getLogRequest = FastJsonSerializer.deserialize(command.getBody(), GetLogRequestCommand.class);
+ byte[] bytes = getFileBytes(getLogRequest.getPath());
+ GetLogResponseCommand getLogResponse = new GetLogResponseCommand(bytes);
+ channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case VIEW_LOG_REQ:
+ ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(command.getBody(), ViewLogRequestCommand.class);
+ String msg = readFile(viewLogRequest.getPath());
+ ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
+ channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case ROLL_VIEW_LOG_REQ:
+ RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(command.getBody(), RollViewLogRequestCommand.class);
+ List<String> lines = readFile(rollViewLogRequest.getPath(), rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
+ StringBuilder builder = new StringBuilder();
+ for (String line : lines){
+ builder.append(line + "\r\n");
+ }
+ RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
+ channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("unknown commandType : %s"));
+ }
+ }
+
+ public ExecutorService getExecutor(){
+ return this.executor;
+ }
+
+ /**
+ * get files bytes
+ *
+ * @param path path
+ * @return byte array of file
+ * @throws Exception exception
+ */
+ private byte[] getFileBytes(String path){
+ InputStream in = null;
+ ByteArrayOutputStream bos = null;
+ try {
+ in = new FileInputStream(path);
+ bos = new ByteArrayOutputStream();
+ byte[] buf = new byte[1024];
+ int len = 0;
+ while ((len = in.read(buf)) != -1) {
+ bos.write(buf, 0, len);
+ }
+ return bos.toByteArray();
+ }catch (IOException e){
+ logger.error("get file bytes error",e);
+ }finally {
+ if (bos != null){
+ try {
+ bos.close();
+ } catch (IOException ignore) {}
+ }
+ if (in != null){
+ try {
+ in.close();
+ } catch (IOException ignore) {}
+ }
+ }
+ return new byte[0];
+ }
+
+ /**
+ * read file content
+ *
+ * @param path
+ * @param skipLine
+ * @param limit
+ * @return
+ */
+ private List<String> readFile(String path, int skipLine, int limit){
+ try (Stream<String> stream = Files.lines(Paths.get(path))) {
+ return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
+ } catch (IOException e) {
+ logger.error("read file failed",e);
+ }
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * read file content
+ *
+ * @param path path
+ * @return string of file content
+ * @throws Exception exception
+ */
+ private String readFile(String path){
+ BufferedReader br = null;
+ String line = null;
+ StringBuilder sb = new StringBuilder();
+ try {
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
+ while ((line = br.readLine()) != null){
+ sb.append(line + "\r\n");
+ }
+ return sb.toString();
+ }catch (IOException e){
+ logger.error("read file failed",e);
+ }finally {
+ try {
+ if (br != null){
+ br.close();
+ }
+ } catch (IOException ignore) {}
+ }
+ return "";
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
new file mode 100644
index 0000000..83b9499
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.log;
+
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggerServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
+
+ private final NettyRemotingServer server;
+
+ private final NettyServerConfig serverConfig;
+
+ private final LoggerRequestProcessor requestProcessor;
+
+ public LoggerServer(){
+ this.serverConfig = new NettyServerConfig();
+ this.serverConfig.setListenPort(Constants.RPC_PORT);
+ this.server = new NettyRemotingServer(serverConfig);
+ this.requestProcessor = new LoggerRequestProcessor();
+ this.server.registerProcessor(CommandType.GET_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.VIEW_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
+ }
+
+ /**
+ * main launches the server from the command line.
+ * @param args arguments
+ */
+ public static void main(String[] args) {
+ final LoggerServer server = new LoggerServer();
+ server.start();
+ }
+
+ /**
+ * server start
+ */
+ public void start() {
+ this.server.start();
+ logger.info("logger server started, listening on port : {}" , Constants.RPC_PORT);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ LoggerServer.this.stop();
+ }
+ });
+ }
+
+ /**
+ * stop
+ */
+ public void stop() {
+ this.server.close();
+ logger.info("logger server shut down");
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
deleted file mode 100644
index 1c6c97b..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * log client
- */
-public class LogClient {
-
- /**
- * logger of LogClient
- */
- private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
-
- /**
- * managed channel
- */
- private final ManagedChannel channel;
-
- /**
- * blocking stub
- */
- private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
-
- /**
- * Construct client connecting to HelloWorld server at host:port.
- *
- * @param host host
- * @param port port
- */
- public LogClient(String host, int port) {
- this(ManagedChannelBuilder.forAddress(host, port)
- // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
- // needing certificates.
- .usePlaintext(true));
- }
-
- /**
- * Construct client for accessing RouteGuide server using the existing channel.
- *
- * @param channelBuilder channel builder
- */
- LogClient(ManagedChannelBuilder<?> channelBuilder) {
- /**
- * set max message read size
- */
- channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
- channel = channelBuilder.build();
- blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
- }
-
- /**
- * shut down channel
- *
- * @throws InterruptedException interrupted exception
- */
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /**
- * roll view log
- *
- * @param path log path
- * @param skipLineNum skip line num
- * @param limit limit
- * @return log content
- */
- public String rollViewLog(String path,int skipLineNum,int limit) {
- logger.info("roll view log , path : {},skipLineNum : {} ,limit :{}", path, skipLineNum, limit);
- LogParameter pathParameter = LogParameter
- .newBuilder()
- .setPath(path)
- .setSkipLineNum(skipLineNum)
- .setLimit(limit)
- .build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.rollViewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("roll view log failed", e);
- return null;
- }
- }
-
- /**
- * view all log
- *
- * @param path log path
- * @return log content
- */
- public String viewLog(String path) {
- logger.info("view log path : {}",path);
-
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.viewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("view log failed", e);
- return null;
- }
- }
-
- /**
- * get log bytes
- *
- * @param path log path
- * @return log content
- */
- public byte[] getLogBytes(String path) {
- logger.info("get log bytes {}",path);
-
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetByteInfo retByteInfo;
- try {
- retByteInfo = blockingStub.getLogBytes(pathParameter);
- return retByteInfo.getData().toByteArray();
- } catch (StatusRuntimeException e) {
- logger.error("get log bytes failed ", e);
- return null;
- }
- }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
deleted file mode 100644
index 5ec5df9..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.dolphinscheduler.common.Constants;
-import com.google.protobuf.ByteString;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * logger server
- */
-public class LoggerServer {
-
- private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
-
- /**
- * server
- */
- private Server server;
-
- /**
- * server start
- * @throws IOException io exception
- */
- public void start() throws IOException {
- /* The port on which the server should run */
- int port = Constants.RPC_PORT;
- server = ServerBuilder.forPort(port)
- .addService(new LogViewServiceGrpcImpl())
- .build()
- .start();
- logger.info("server started, listening on port : {}" , port);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- logger.info("shutting down gRPC server since JVM is shutting down");
- LoggerServer.this.stop();
- logger.info("server shut down");
- }
- });
- }
-
- /**
- * stop
- */
- private void stop() {
- if (server != null) {
- server.shutdown();
- }
- }
-
- /**
- * await termination on the main thread since the grpc library uses daemon threads.
- */
- private void blockUntilShutdown() throws InterruptedException {
- if (server != null) {
- server.awaitTermination();
- }
- }
-
- /**
- * main launches the server from the command line.
- */
-
- /**
- * main launches the server from the command line.
- * @param args arguments
- * @throws IOException io exception
- * @throws InterruptedException interrupted exception
- */
- public static void main(String[] args) throws IOException, InterruptedException {
- final LoggerServer server = new LoggerServer();
- server.start();
- server.blockUntilShutdown();
- }
-
- /**
- * Log View Service Grpc Implementation
- */
- static class LogViewServiceGrpcImpl extends LogViewServiceGrpc.LogViewServiceImplBase {
- @Override
- public void rollViewLog(LogParameter request, StreamObserver<RetStrInfo> responseObserver) {
-
- logger.info("log parameter path : {} ,skip line : {}, limit : {}",
- request.getPath(),
- request.getSkipLineNum(),
- request.getLimit());
- List<String> list = readFile(request.getPath(), request.getSkipLineNum(), request.getLimit());
- StringBuilder sb = new StringBuilder();
- boolean errorLineFlag = false;
- for (String line : list){
- sb.append(line + "\r\n");
- }
- RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build();
- responseObserver.onNext(retInfoBuild);
- responseObserver.onCompleted();
- }
-
- @Override
- public void viewLog(PathParameter request, StreamObserver<RetStrInfo> responseObserver) {
- logger.info("task path is : {} " , request.getPath());
- RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(readFile(request.getPath())).build();
- responseObserver.onNext(retInfoBuild);
- responseObserver.onCompleted();
- }
-
- @Override
- public void getLogBytes(PathParameter request, StreamObserver<RetByteInfo> responseObserver) {
- try {
- ByteString bytes = ByteString.copyFrom(getFileBytes(request.getPath()));
- RetByteInfo.Builder builder = RetByteInfo.newBuilder();
- builder.setData(bytes);
- responseObserver.onNext(builder.build());
- responseObserver.onCompleted();
- }catch (Exception e){
- logger.error("get log bytes failed",e);
- }
- }
- }
-
- /**
- * get files bytes
- *
- * @param path path
- * @return byte array of file
- * @throws Exception exception
- */
- private static byte[] getFileBytes(String path){
- InputStream in = null;
- ByteArrayOutputStream bos = null;
- try {
- in = new FileInputStream(path);
- bos = new ByteArrayOutputStream();
- byte[] buf = new byte[1024];
- int len = 0;
- while ((len = in.read(buf)) != -1) {
- bos.write(buf, 0, len);
- }
- return bos.toByteArray();
- }catch (IOException e){
- logger.error("get file bytes error",e);
- }finally {
- if (bos != null){
- try {
- bos.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (in != null){
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
- /**
- * read file content
- *
- * @param path
- * @param skipLine
- * @param limit
- * @return
- */
- private static List<String> readFile(String path,int skipLine,int limit){
- try (Stream<String> stream = Files.lines(Paths.get(path))) {
- return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
- } catch (IOException e) {
- logger.error("read file failed",e);
- }
- return null;
- }
-
- /**
- * read file content
- *
- * @param path path
- * @return string of file content
- * @throws Exception exception
- */
- private static String readFile(String path){
- BufferedReader br = null;
- String line = null;
- StringBuilder sb = new StringBuilder();
- try {
- br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
- boolean errorLineFlag = false;
- while ((line = br.readLine()) != null){
- sb.append(line + "\r\n");
- }
-
- return sb.toString();
- }catch (IOException e){
- logger.error("read file failed",e);
- }finally {
- try {
- if (br != null){
- br.close();
- }
- } catch (IOException e) {
- logger.error(e.getMessage(),e);
- }
- }
- return null;
- }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 0b621a9..9afde60 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.rpc.LogClient;
import org.apache.commons.io.FileUtils;
+import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -374,7 +374,7 @@ public class ProcessUtils {
public static void killYarnJob(TaskInstance taskInstance) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT);
+ LogClientService logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
String log = logClient.viewLog(taskInstance.getLogPath());
if (StringUtils.isNotEmpty(log)) {
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
new file mode 100644
index 0000000..64d3481
--- /dev/null
+++ b/dolphinscheduler-service/pom.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dolphinscheduler</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <version>1.2.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dolphinscheduler-service</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-remote</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
new file mode 100644
index 0000000..d6e1b9b
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * log client
+ */
+public class LogClientService implements NettyRequestProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
+
+ private final NettyClientConfig clientConfig;
+
+ private final NettyRemotingClient client;
+
+ private final Address address;
+
+ private final long logRequestTimeout = 10 * 1000; //10s
+
+ /**
+ * construct client
+ * @param host host
+ * @param port port
+ */
+ public LogClientService(String host, int port) {
+ this.address = new Address(host, port);
+ this.clientConfig = new NettyClientConfig();
+ this.clientConfig.setWorkerThreads(1);
+ this.client = new NettyRemotingClient(clientConfig);
+ this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RES,this);
+ this.client.registerProcessor(CommandType.VIEW_LOG_RES, this);
+ this.client.registerProcessor(CommandType.GET_LOG_RES, this);
+
+ }
+
+ /**
+ * shutdown
+ */
+ public void shutdown() {
+ this.client.close();
+ logger.info("logger client shutdown");
+ }
+
+ /**
+ * roll view log
+ * @param path path
+ * @param skipLineNum skip line number
+ * @param limit limit
+ * @return log content
+ */
+ public String rollViewLog(String path,int skipLineNum,int limit) {
+ logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
+ RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ logger.error("roll view log error", e);
+ }
+ return result;
+ }
+
+ /**
+ * view log
+ * @param path path
+ * @return log content
+ */
+ public String viewLog(String path) {
+ logger.info("view log path {}", path);
+ ViewLogRequestCommand request = new ViewLogRequestCommand(path);
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ logger.error("view log error", e);
+ }
+ return result;
+ }
+
+ /**
+ * get log size
+ * @param path log path
+ * @return log content bytes
+ */
+ public byte[] getLogBytes(String path) {
+ logger.info("log path {}", path);
+ GetLogRequestCommand request = new GetLogRequestCommand(path);
+ byte[] result = null;
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = (byte[])promise.getResult();
+ } catch (Exception e) {
+ logger.error("get log size error", e);
+ }
+ return result;
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received log response : {}", command);
+ switch (command.getType()){
+ case ROLL_VIEW_LOG_RES:
+ RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(command.getBody(), RollViewLogResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
+ break;
+ case VIEW_LOG_RES:
+ ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(command.getBody(), ViewLogResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), viewLog.getMsg());
+ break;
+ case GET_LOG_RES:
+ GetLogResponseCommand getLog = FastJsonSerializer.deserialize(command.getBody(), GetLogResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), getLog.getData());
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
new file mode 100644
index 0000000..ec9cac6
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.log;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+
+public class LogPromise {
+
+ private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
+
+ private long opaque;
+
+ private final long start;
+
+ private final long timeout;
+
+ private final CountDownLatch latch;
+
+ private Object result;
+
+ public LogPromise(long opaque, long timeout){
+ this.opaque = opaque;
+ this.timeout = timeout;
+ this.start = System.currentTimeMillis();
+ this.latch = new CountDownLatch(1);
+ PROMISES.put(opaque, this);
+ }
+
+
+ public static void notify(long opaque, Object result){
+ LogPromise promise = PROMISES.remove(opaque);
+ if(promise != null){
+ promise.doCountDown(result);
+ }
+ }
+
+ private void doCountDown(Object result){
+ this.result = result;
+ this.latch.countDown();
+ }
+
+ public boolean isTimeout(){
+ return System.currentTimeMillis() - start > timeout;
+ }
+
+ public Object getResult(){
+ try {
+ latch.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignore) {
+ }
+ PROMISES.remove(opaque);
+ return this.result;
+ }
+
+
+}
diff --git a/pom.xml b/pom.xml
index 6451dce..5e10813 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,6 @@
<commons.configuration.version>1.10</commons.configuration.version>
<commons.email.version>1.5</commons.email.version>
<poi.version>3.17</poi.version>
- <freemarker.version>2.3.21</freemarker.version>
<javax.servlet.api.version>3.1.0</javax.servlet.api.version>
<commons.collections4.version>4.1</commons.collections4.version>
<guava.version>20.0</guava.version>
@@ -120,6 +119,7 @@
<servlet-api.version>2.5</servlet-api.version>
<swagger.version>1.9.3</swagger.version>
<springfox.version>2.9.2</springfox.version>
+ <netty.version>4.1.42.Final</netty.version>
</properties>
<dependencyManagement>
@@ -230,7 +230,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-rpc</artifactId>
+ <artifactId>dolphinscheduler-remote</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -238,6 +238,11 @@
<artifactId>dolphinscheduler-alert</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-service</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
@@ -358,11 +363,11 @@
<version>${slf4j.log4j12.version}</version>
</dependency>
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <version>${commons.collections.version}</version>
- </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>${commons.collections.version}</version>
+ </dependency>
<dependency>
<groupId>commons-httpclient</groupId>
@@ -406,13 +411,6 @@
<version>${poi.version}</version>
</dependency>
- <dependency>
- <groupId>org.freemarker</groupId>
- <artifactId>freemarker</artifactId>
- <version>${freemarker.version}</version>
- </dependency>
-
-
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -521,23 +519,29 @@
<artifactId>servlet-api</artifactId>
<version>${servlet-api.version}</version>
</dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger2</artifactId>
- <version>${springfox.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger-ui</artifactId>
- <version>${springfox.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.github.xiaoymin</groupId>
- <artifactId>swagger-bootstrap-ui</artifactId>
- <version>${swagger.version}</version>
- </dependency>
+
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger2</artifactId>
+ <version>${springfox.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger-ui</artifactId>
+ <version>${springfox.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.xiaoymin</groupId>
+ <artifactId>swagger-bootstrap-ui</artifactId>
+ <version>${swagger.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -768,7 +772,6 @@
<exclude>**/dolphinscheduler-ui/src/view/common/outro.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/view/common/meta.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/combo/1.0.0/3rd.css</exclude>
- <exclude>**/dolphinscheduler-rpc/src/main/java/org/apache/dolphinscheduler/rpc/LogViewServiceGrpc.java</exclude>
</excludes>
<consoleOutput>true</consoleOutput>
</configuration>
@@ -860,8 +863,9 @@
<module>dolphinscheduler-api</module>
<module>dolphinscheduler-dao</module>
<module>dolphinscheduler-alert</module>
- <module>dolphinscheduler-rpc</module>
<module>dolphinscheduler-dist</module>
- </modules>
+ <module>dolphinscheduler-remote</module>
+ <module>dolphinscheduler-service</module>
+ </modules>
</project>