You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/10 10:02:08 UTC
[incubator-dolphinscheduler] branch refactor-architecture updated: 1,remove dolphinscheduler-rpc module 2,add dolphinscheduler-remote module 3,add dolphinscheduler-service module 4,refactor LoggerServer module (#1925)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-architecture
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-architecture by this push:
new 9a70e77 1,remove dolphinscheduler-rpc module 2,add dolphinscheduler-remote module 3,add dolphinscheduler-service module 4,refactor LoggerServer module (#1925)
9a70e77 is described below
commit 9a70e77955cbb954168cc2402ae510559305d194
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Mon Feb 10 18:01:56 2020 +0800
1,remove dolphinscheduler-rpc module 2,add dolphinscheduler-remote module 3,add dolphinscheduler-service module 4,refactor LoggerServer module (#1925)
* 1,remove dolphinscheduler-rpc module
2,add dolphinscheduler-remote module
3,add dolphinscheduler-service module
4,refactor LoggerServer module
* ProcessUtils modify
---
dolphinscheduler-api/pom.xml | 8 +-
.../apache/dolphinscheduler/api/log/LogClient.java | 137 ------------
.../api/service/LoggerService.java | 6 +-
dolphinscheduler-remote/pom.xml | 44 ++++
.../remote/NettyRemotingClient.java | 197 +++++++++++++++++
.../remote/NettyRemotingServer.java | 165 ++++++++++++++
.../remote/codec/NettyDecoder.java | 92 ++++++++
.../remote/codec/NettyEncoder.java | 44 ++++
.../dolphinscheduler/remote/command/Command.java | 102 +++++++++
.../remote/command/CommandHeader.java | 55 +++++
.../remote/command/CommandType.java | 1 +
.../remote/command/ExecuteTaskRequestCommand.java | 1 +
.../remote/command/ExecuteTaskResponseCommand.java | 1 +
.../dolphinscheduler/remote/command/Ping.java | 57 +++++
.../dolphinscheduler/remote/command/Pong.java | 54 +++++
.../command/log/GetLogBytesRequestCommand.java | 62 ++++++
.../command/log/GetLogBytesResponseCommand.java | 56 +++++
.../command/log/RollViewLogRequestCommand.java | 80 +++++++
.../command/log/RollViewLogResponseCommand.java | 55 +++++
.../remote/command/log/ViewLogRequestCommand.java | 58 +++++
.../remote/command/log/ViewLogResponseCommand.java | 55 +++++
.../remote/config/NettyClientConfig.java | 76 +++++++
.../remote/config/NettyServerConfig.java | 95 ++++++++
.../remote/exceptions/RemotingException.java | 94 ++++++++
.../remote/handler/NettyClientHandler.java | 123 +++++++++++
.../remote/handler/NettyServerHandler.java | 121 +++++++++++
.../remote/processor/NettyRequestProcessor.java | 28 +++
.../dolphinscheduler/remote/utils/Address.java | 90 ++++++++
.../remote/utils/ChannelUtils.java | 41 ++++
.../dolphinscheduler/remote/utils/Constants.java | 31 +++
.../remote/utils/FastJsonSerializer.java | 39 ++++
.../apache/dolphinscheduler/remote/utils/Pair.java | 47 ++++
.../remote/NettyRemotingClientTest.java | 71 ++++++
dolphinscheduler-rpc/pom.xml | 113 ----------
.../src/main/proto/scheduler.proto | 101 ---------
dolphinscheduler-server/pom.xml | 2 +-
.../server/log/LoggerRequestProcessor.java | 179 ++++++++++++++++
.../dolphinscheduler/server/log/LoggerServer.java | 91 ++++++++
.../dolphinscheduler/server/rpc/LogClient.java | 149 -------------
.../dolphinscheduler/server/rpc/LoggerServer.java | 238 ---------------------
.../server/utils/ProcessUtils.java | 4 +-
dolphinscheduler-service/pom.xml | 29 +++
.../service/MasterResponseCommand.java | 55 +++++
.../service/WorkerRequestCommand.java | 58 +++++
.../service/log/LogClientService.java | 166 ++++++++++++++
.../dolphinscheduler/service/log/LogPromise.java | 81 +++++++
.../service/worker/WorkerClientService.java | 107 +++++++++
pom.xml | 13 +-
48 files changed, 2819 insertions(+), 753 deletions(-)
diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml
index c10f443..11b23d9 100644
--- a/dolphinscheduler-api/pom.xml
+++ b/dolphinscheduler-api/pom.xml
@@ -129,13 +129,13 @@
</dependency>
<dependency>
- <groupId>com.github.xiaoymin</groupId>
- <artifactId>swagger-bootstrap-ui</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-rpc</artifactId>
+ <groupId>com.github.xiaoymin</groupId>
+ <artifactId>swagger-bootstrap-ui</artifactId>
</dependency>
<dependency>
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java
deleted file mode 100644
index 3452060..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.log;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * log client
- */
-public class LogClient {
-
- private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
-
- private final ManagedChannel channel;
- private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
-
- /**
- * construct client connecting to HelloWorld server at {@code host:port}
- *
- * @param host host
- * @param port port
- */
- public LogClient(String host, int port) {
- this(ManagedChannelBuilder.forAddress(host, port)
- // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
- // needing certificates.
- .usePlaintext(true));
- }
-
- /**
- * construct client for accessing RouteGuide server using the existing channel
- *
- */
- LogClient(ManagedChannelBuilder<?> channelBuilder) {
- /**
- * set max read size
- */
- channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
- channel = channelBuilder.build();
- blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
- }
-
- /**
- * shutdown
- *
- * @throws InterruptedException InterruptedException
- */
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /**
- * roll view log
- *
- * @param path path
- * @param skipLineNum skip line number
- * @param limit limit
- * @return log content
- */
- public String rollViewLog(String path,int skipLineNum,int limit) {
- logger.info("roll view log : path {},skipLineNum {} ,limit {}", path, skipLineNum, limit);
- LogParameter pathParameter = LogParameter
- .newBuilder()
- .setPath(path)
- .setSkipLineNum(skipLineNum)
- .setLimit(limit)
- .build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.rollViewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("roll view log error", e);
- return null;
- }
- }
-
- /**
- * view log
- *
- * @param path path
- * @return log content
- */
- public String viewLog(String path) {
- logger.info("view log path {}",path);
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.viewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("view log error", e);
- return null;
- }
- }
-
- /**
- * get log size
- *
- * @param path log path
- * @return log content bytes
- */
- public byte[] getLogBytes(String path) {
- logger.info("log path {}",path);
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetByteInfo retByteInfo;
- try {
- retByteInfo = blockingStub.getLogBytes(pathParameter);
- return retByteInfo.getData().toByteArray();
- } catch (StatusRuntimeException e) {
- logger.error("log size error", e);
- return null;
- }
- }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index 61dc1a7..108d5d4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.log.LogClient;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -65,7 +65,7 @@ public class LoggerService {
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
- LogClient logClient = new LogClient(host, Constants.RPC_PORT);
+ LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
@@ -85,7 +85,7 @@ public class LoggerService {
throw new RuntimeException("task instance is null");
}
String host = taskInstance.getHost();
- LogClient logClient = new LogClient(host, Constants.RPC_PORT);
+ LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
return logClient.getLogBytes(taskInstance.getLogPath());
}
}
diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml
new file mode 100644
index 0000000..b67b033
--- /dev/null
+++ b/dolphinscheduler-remote/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dolphinscheduler</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <version>1.2.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dolphinscheduler-remote</artifactId>
+
+ <name>dolphinscheduler-remote</name>
+ <!-- FIXME change it to the project's website -->
+ <url>http://www.example.com</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.7</maven.compiler.source>
+ <maven.compiler.target>1.7</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
new file mode 100644
index 0000000..678fe84
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
+import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * remoting netty client
+ */
+public class NettyRemotingClient {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
+
+ private final Bootstrap bootstrap = new Bootstrap();
+
+ private final NettyEncoder encoder = new NettyEncoder();
+
+ private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
+
+ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private final NioEventLoopGroup workerGroup;
+
+ private final NettyClientHandler clientHandler = new NettyClientHandler(this);
+
+ private final NettyClientConfig clientConfig;
+
+ public NettyRemotingClient(final NettyClientConfig clientConfig){
+ this.clientConfig = clientConfig;
+ this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+ this.start();
+ }
+
+ private void start(){
+
+ this.bootstrap
+ .group(this.workerGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
+ .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
+ .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
+ .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ new NettyDecoder(),
+ clientHandler,
+ encoder);
+ }
+ });
+ //
+ isStarted.compareAndSet(false, true);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ registerProcessor(commandType, processor, null);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ this.clientHandler.registerProcessor(commandType, processor, executor);
+ }
+
+ public void send(final Address address, final Command command) throws RemotingException {
+ final Channel channel = getChannel(address);
+ if (channel == null) {
+ throw new RemotingException("network error");
+ }
+ try {
+ channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()){
+ logger.info("sent command {} to {}", command, address);
+ } else{
+ logger.error("send command {} to {} failed, error {}", command, address, future.cause());
+ }
+ }
+ });
+ } catch (Exception ex) {
+ String msg = String.format("send command %s to address %s encounter error", command, address);
+ throw new RemotingException(msg, ex);
+ }
+ }
+
+ public Channel getChannel(Address address) {
+ Channel channel = channels.get(address);
+ if(channel != null && channel.isActive()){
+ return channel;
+ }
+ return createChannel(address, true);
+ }
+
+ public Channel createChannel(Address address, boolean isSync) {
+ ChannelFuture future;
+ try {
+ synchronized (bootstrap){
+ future = bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort()));
+ }
+ if(isSync){
+ future.sync();
+ }
+ if (future.isSuccess()) {
+ Channel channel = future.channel();
+ channels.put(address, channel);
+ return channel;
+ }
+ } catch (Exception ex) {
+ logger.info("connect to {} error {}", address, ex);
+ }
+ return null;
+ }
+
+ public ExecutorService getDefaultExecutor() {
+ return defaultExecutor;
+ }
+
+ public void close() {
+ if(isStarted.compareAndSet(true, false)){
+ try {
+ closeChannels();
+ if(workerGroup != null){
+ this.workerGroup.shutdownGracefully();
+ }
+ if(defaultExecutor != null){
+ defaultExecutor.shutdown();
+ }
+ } catch (Exception ex) {
+ logger.error("netty client close exception", ex);
+ }
+ logger.info("netty client closed");
+ }
+ }
+
+ private void closeChannels(){
+ for (Channel channel : this.channels.values()) {
+ channel.close();
+ }
+ this.channels.clear();
+ }
+
+ public void removeChannel(Address address){
+ Channel channel = this.channels.remove(address);
+ if(channel != null){
+ channel.close();
+ }
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
new file mode 100644
index 0000000..5823dbb
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
+import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * remoting netty server
+ */
+public class NettyRemotingServer {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
+
+ private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+
+ private final NettyEncoder encoder = new NettyEncoder();
+
+ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+
+ private final NioEventLoopGroup bossGroup;
+
+ private final NioEventLoopGroup workGroup;
+
+ private final NettyServerConfig serverConfig;
+
+ private final NettyServerHandler serverHandler = new NettyServerHandler(this);
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ public NettyRemotingServer(final NettyServerConfig serverConfig){
+ this.serverConfig = serverConfig;
+
+ this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+
+ this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+ }
+
+ public void start(){
+
+ if(this.isStarted.get()){
+ return;
+ }
+
+ this.serverBootstrap
+ .group(this.bossGroup, this.workGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
+ .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
+ .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
+ .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
+ .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
+ .childHandler(new ChannelInitializer<NioSocketChannel>() {
+
+ @Override
+ protected void initChannel(NioSocketChannel ch) throws Exception {
+ initNettyChannel(ch);
+ }
+ });
+
+ ChannelFuture future;
+ try {
+ future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
+ } catch (Exception e) {
+ logger.error("NettyRemotingServer bind fail {}, exit", e);
+ throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
+ }
+ if (future.isSuccess()) {
+ logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
+ } else if (future.cause() != null) {
+ throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
+ } else {
+ throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
+ }
+ //
+ isStarted.compareAndSet(false, true);
+ }
+
+ private void initNettyChannel(NioSocketChannel ch) throws Exception{
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("encoder", encoder);
+ pipeline.addLast("decoder", new NettyDecoder());
+ pipeline.addLast("handler", serverHandler);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, null);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ this.serverHandler.registerProcessor(commandType, processor, executor);
+ }
+
+ public ExecutorService getDefaultExecutor() {
+ return defaultExecutor;
+ }
+
+ public void close() {
+ if(isStarted.compareAndSet(true, false)){
+ try {
+ if(bossGroup != null){
+ this.bossGroup.shutdownGracefully();
+ }
+ if(workGroup != null){
+ this.workGroup.shutdownGracefully();
+ }
+ if(defaultExecutor != null){
+ defaultExecutor.shutdown();
+ }
+ } catch (Exception ex) {
+ logger.error("netty server close exception", ex);
+ }
+ logger.info("netty server closed");
+ }
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
new file mode 100644
index 0000000..998f4ee
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.codec;
+
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandHeader;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.util.List;
+
+/**
+ * netty decoder
+ */
+public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
+
+ public NettyDecoder(){
+ super(State.MAGIC);
+ }
+
+ private final CommandHeader commandHeader = new CommandHeader();
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ switch (state()){
+ case MAGIC:
+ checkMagic(in.readByte());
+ checkpoint(State.COMMAND);
+ case COMMAND:
+ commandHeader.setType(in.readByte());
+ checkpoint(State.OPAQUE);
+ case OPAQUE:
+ commandHeader.setOpaque(in.readLong());
+ checkpoint(State.BODY_LENGTH);
+ case BODY_LENGTH:
+ commandHeader.setBodyLength(in.readInt());
+ checkpoint(State.BODY);
+ case BODY:
+ byte[] body = new byte[commandHeader.getBodyLength()];
+ in.readBytes(body);
+ //
+ Command packet = new Command();
+ packet.setType(commandType(commandHeader.getType()));
+ packet.setOpaque(commandHeader.getOpaque());
+ packet.setBody(body);
+ out.add(packet);
+ //
+ checkpoint(State.MAGIC);
+ }
+ }
+
+ private CommandType commandType(byte type){
+ for(CommandType ct : CommandType.values()){
+ if(ct.ordinal() == type){
+ return ct;
+ }
+ }
+ return null;
+ }
+
+ private void checkMagic(byte magic) {
+ if (magic != Command.MAGIC) {
+ throw new IllegalArgumentException("illegal packet [magic]" + magic);
+ }
+ }
+
+ enum State{
+ MAGIC,
+ COMMAND,
+ OPAQUE,
+ BODY_LENGTH,
+ BODY;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
new file mode 100644
index 0000000..dd4e523
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.dolphinscheduler.remote.command.Command;
+
+/**
+ * netty encoder
+ */
+@Sharable
+public class NettyEncoder extends MessageToByteEncoder<Command> {
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
+ if(msg == null){
+ throw new Exception("encode msg is null");
+ }
+ out.writeByte(Command.MAGIC);
+ out.writeByte(msg.getType().ordinal());
+ out.writeLong(msg.getOpaque());
+ out.writeInt(msg.getBody().length);
+ out.writeBytes(msg.getBody());
+ }
+
+}
+
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
new file mode 100644
index 0000000..4687db3
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+
+/**
+ * receive task log request command and content fill
+ * for netty data serializable transfer
+ */
+public class Command implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final byte MAGIC = (byte) 0xbabe;
+
+ public Command(){
+ }
+
+ public Command(long opaque){
+ this.opaque = opaque;
+ }
+
+ /**
+ * comman type
+ */
+ private CommandType type;
+
+ /**
+ * request unique identification
+ */
+ private long opaque;
+
+ private byte[] body;
+
+ public CommandType getType() {
+ return type;
+ }
+
+ public void setType(CommandType type) {
+ this.type = type;
+ }
+
+ public long getOpaque() {
+ return opaque;
+ }
+
+ public void setOpaque(long opaque) {
+ this.opaque = opaque;
+ }
+
+ public byte[] getBody() {
+ return body;
+ }
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (opaque ^ (opaque >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Command other = (Command) obj;
+ return opaque == other.opaque;
+ }
+
+ @Override
+ public String toString() {
+ return "Command [type=" + type + ", opaque=" + opaque + ", bodyLen=" + (body == null ? 0 : body.length) + "]";
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
new file mode 100644
index 0000000..92f7ac3
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+
+/**
+ * command header
+ */
+public class CommandHeader implements Serializable {
+
+ private byte type;
+
+ private long opaque;
+
+ private int bodyLength;
+
+ public int getBodyLength() {
+ return bodyLength;
+ }
+
+ public void setBodyLength(int bodyLength) {
+ this.bodyLength = bodyLength;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public void setType(byte type) {
+ this.type = type;
+ }
+
+ public long getOpaque() {
+ return opaque;
+ }
+
+ public void setOpaque(long opaque) {
+ this.opaque = opaque;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
new file mode 100644
index 0000000..185358a
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
PING,
PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
new file mode 100644
index 0000000..e75c2de
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concu
rrent.atomic.AtomicLong;
public class ExecuteTaskRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private String applicationName;
private String groupName;
private String taskName;
private int connectorPort;
private String description;
private String className;
private String methodName;
private String params;
private List<Integer> shardItems;
public List<Integer> getShardItems() {
return shardItems;
}
public void setShardItems(List<Integer> shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getAp
plicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
new file mode 100644
index 0000000..fafb575
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong
;
public class ExecuteTaskResponseCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private Object result;
private long receivedTime;
private int executeCount;
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount
;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
new file mode 100644
index 0000000..365d451
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Ping implements Serializable {
+
+ private static final AtomicLong ID = new AtomicLong(1);
+
+ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+ private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+ private static final ByteBuf PING_BUF;
+
+ static {
+ ByteBuf ping = Unpooled.buffer();
+ ping.writeByte(Command.MAGIC);
+ ping.writeByte(CommandType.PING.ordinal());
+ ping.writeLong(0);
+ ping.writeInt(0);
+ ping.writeBytes(EMPTY_BODY);
+ PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+ }
+
+ public static ByteBuf pingContent(){
+ return PING_BUF.duplicate();
+ }
+
+ public static Command create(){
+ Command command = new Command(ID.getAndIncrement());
+ command.setType(CommandType.PING);
+ command.setBody(EMPTY_BODY_ARRAY);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
new file mode 100644
index 0000000..bc5abda
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+
+
+public class Pong implements Serializable {
+
+ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+ private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+ private static final ByteBuf PONG_BUF;
+
+ static {
+ ByteBuf ping = Unpooled.buffer();
+ ping.writeByte(Command.MAGIC);
+ ping.writeByte(CommandType.PONG.ordinal());
+ ping.writeLong(0);
+ ping.writeInt(0);
+ ping.writeBytes(EMPTY_BODY);
+ PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+ }
+
+ public static ByteBuf pingContent(){
+ return PONG_BUF.duplicate();
+ }
+
+ public static Command create(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.PONG);
+ command.setBody(EMPTY_BODY_ARRAY);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
new file mode 100644
index 0000000..1a2e6e4
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * get log bytes request command
+ */
+public class GetLogBytesRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public GetLogBytesRequestCommand() {
+ }
+
+ public GetLogBytesRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.GET_LOG_BYTES_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
new file mode 100644
index 0000000..05692fb
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * get log bytes response command
+ */
+public class GetLogBytesResponseCommand implements Serializable {
+
+ private byte[] data;
+
+ public GetLogBytesResponseCommand() {
+ }
+
+ public GetLogBytesResponseCommand(byte[] data) {
+ this.data = data;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
new file mode 100644
index 0000000..49d19aa
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * roll view log request command
+ */
+public class RollViewLogRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ private int skipLineNum;
+
+ private int limit;
+
+ public RollViewLogRequestCommand() {
+ }
+
+ public RollViewLogRequestCommand(String path, int skipLineNum, int limit) {
+ this.path = path;
+ this.skipLineNum = skipLineNum;
+ this.limit = limit;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public int getSkipLineNum() {
+ return skipLineNum;
+ }
+
+ public void setSkipLineNum(int skipLineNum) {
+ this.skipLineNum = skipLineNum;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
new file mode 100644
index 0000000..def3257
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * roll view log response command
+ */
+public class RollViewLogResponseCommand implements Serializable {
+
+ private String msg;
+
+ public RollViewLogResponseCommand() {
+ }
+
+ public RollViewLogResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
new file mode 100644
index 0000000..9ba9cd3
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * view log request command
+ */
+public class ViewLogRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public ViewLogRequestCommand() {
+ }
+
+ public ViewLogRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
new file mode 100644
index 0000000..6e3c799
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * view log response command
+ */
+public class ViewLogResponseCommand implements Serializable {
+
+ private String msg;
+
+ public ViewLogResponseCommand() {
+ }
+
+ public ViewLogResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
new file mode 100644
index 0000000..56d2643
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+/**
+ * netty client config
+ */
+public class NettyClientConfig {
+
+ private int workerThreads = Constants.CPUS;
+
+ private boolean tcpNoDelay = true;
+
+ private boolean soKeepalive = true;
+
+ private int sendBufferSize = 65535;
+
+ private int receiveBufferSize = 65535;
+
+ public int getWorkerThreads() {
+ return workerThreads;
+ }
+
+ public void setWorkerThreads(int workerThreads) {
+ this.workerThreads = workerThreads;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public boolean isSoKeepalive() {
+ return soKeepalive;
+ }
+
+ public void setSoKeepalive(boolean soKeepalive) {
+ this.soKeepalive = soKeepalive;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
new file mode 100644
index 0000000..847f316
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+/**
+ * netty server config
+ */
+public class NettyServerConfig {
+
+ private int soBacklog = 1024;
+
+ private boolean tcpNoDelay = true;
+
+ private boolean soKeepalive = true;
+
+ private int sendBufferSize = 65535;
+
+ private int receiveBufferSize = 65535;
+
+ private int workerThread = Constants.CPUS;
+
+ private int listenPort = 12346;
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
+ public int getSoBacklog() {
+ return soBacklog;
+ }
+
+ public void setSoBacklog(int soBacklog) {
+ this.soBacklog = soBacklog;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public boolean isSoKeepalive() {
+ return soKeepalive;
+ }
+
+ public void setSoKeepalive(boolean soKeepalive) {
+ this.soKeepalive = soKeepalive;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getWorkerThread() {
+ return workerThread;
+ }
+
+ public void setWorkerThread(int workerThread) {
+ this.workerThread = workerThread;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
new file mode 100644
index 0000000..29d48db
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.exceptions;
+
+/**
+ * remote exception
+ */
+public class RemotingException extends Exception {
+
+ public RemotingException() {
+ super();
+ }
+
+ /** Constructs a new runtime exception with the specified detail message.
+ * The cause is not initialized, and may subsequently be initialized by a
+ * call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public RemotingException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new runtime exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this runtime exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public RemotingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /** Constructs a new runtime exception with the specified cause and a
+ * detail message of <tt>(cause==null ? null : cause.toString())</tt>
+ * (which typically contains the class and detail message of
+ * <tt>cause</tt>). This constructor is useful for runtime exceptions
+ * that are little more than wrappers for other throwables.
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public RemotingException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new runtime exception with the specified detail
+ * message, cause, suppression enabled or disabled, and writable
+ * stack trace enabled or disabled.
+ *
+ * @param message the detail message.
+ * @param cause the cause. (A {@code null} value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ * @param enableSuppression whether or not suppression is enabled
+ * or disabled
+ * @param writableStackTrace whether or not the stack trace should
+ * be writable
+ *
+ * @since 1.7
+ */
+ protected RemotingException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
new file mode 100644
index 0000000..b063080
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * netty client request handler
+ */
+@ChannelHandler.Sharable
+public class NettyClientHandler extends ChannelInboundHandlerAdapter {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
+
+ private final NettyRemotingClient nettyRemotingClient;
+
+ private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
+
+ public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
+ this.nettyRemotingClient = nettyRemotingClient;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ processReceived(ctx.channel(), (Command)msg);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ ExecutorService executorRef = executor;
+ if(executorRef == null){
+ executorRef = nettyRemotingClient.getDefaultExecutor();
+ }
+ this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+ }
+
+ private void processReceived(final Channel channel, final Command msg) {
+ final CommandType commandType = msg.getType();
+ final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
+ if (pair != null) {
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ pair.getLeft().process(channel, msg);
+ } catch (Throwable ex) {
+ logger.error("process msg {} error : {}", msg, ex);
+ }
+ }
+ };
+ try {
+ pair.getRight().submit(r);
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+ }
+ } else {
+ logger.warn("commandType {} not support", commandType);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exceptionCaught : {}", cause);
+ nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ Channel ch = ctx.channel();
+ ChannelConfig config = ch.config();
+
+ if (!ch.isWritable()) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is not writable, over high water level : {}",
+ new Object[]{ch, config.getWriteBufferHighWaterMark()});
+ }
+
+ config.setAutoRead(false);
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is writable, to low water : {}",
+ new Object[]{ch, config.getWriteBufferLowWaterMark()});
+ }
+ config.setAutoRead(true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
new file mode 100644
index 0000000..8a7ee39
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * netty server request handler
+ */
+@ChannelHandler.Sharable
+public class NettyServerHandler extends ChannelInboundHandlerAdapter {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+
+ private final NettyRemotingServer nettyRemotingServer;
+
+ private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
+
+ public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
+ this.nettyRemotingServer = nettyRemotingServer;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ processReceived(ctx.channel(), (Command)msg);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, null);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ ExecutorService executorRef = executor;
+ if(executorRef == null){
+ executorRef = nettyRemotingServer.getDefaultExecutor();
+ }
+ this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+ }
+
+ private void processReceived(final Channel channel, final Command msg) {
+ final CommandType commandType = msg.getType();
+ final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
+ if (pair != null) {
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ pair.getLeft().process(channel, msg);
+ } catch (Throwable ex) {
+ logger.error("process msg {} error : {}", msg, ex);
+ }
+ }
+ };
+ try {
+ pair.getRight().submit(r);
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+ }
+ } else {
+ logger.warn("commandType {} not support", commandType);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exceptionCaught : {}", cause);
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ Channel ch = ctx.channel();
+ ChannelConfig config = ch.config();
+
+ if (!ch.isWritable()) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is not writable, over high water level : {}",
+ new Object[]{ch, config.getWriteBufferHighWaterMark()});
+ }
+
+ config.setAutoRead(false);
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is writable, to low water : {}",
+ new Object[]{ch, config.getWriteBufferLowWaterMark()});
+ }
+ config.setAutoRead(true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
new file mode 100644
index 0000000..10a8195
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+
+/**
+ * netty request processor
+ */
+public interface NettyRequestProcessor {
+
+ void process(final Channel channel, final Command command);
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
new file mode 100644
index 0000000..221b895
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import java.io.Serializable;
+
+/**
+ * server address
+ */
+public class Address implements Serializable {
+
+ private String host;
+
+ private int port;
+
+ public Address(){
+ //NOP
+ }
+
+ public Address(String host, int port){
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Address other = (Address) obj;
+ if (host == null) {
+ if (other.host != null) {
+ return false;
+ }
+ } else if (!host.equals(other.host)) {
+ return false;
+ }
+ return port == other.port;
+ }
+
+ @Override
+ public String toString() {
+ return "Address [host=" + host + ", port=" + port + "]";
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
new file mode 100644
index 0000000..e9d93da
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import io.netty.channel.Channel;
+
+import java.net.InetSocketAddress;
+
+/**
+ * channel utils
+ */
+public class ChannelUtils {
+
+ public static String getLocalAddress(Channel channel){
+ return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress();
+ }
+
+ public static String getRemoteAddress(Channel channel){
+ return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress();
+ }
+
+ public static Address toAddress(Channel channel){
+ InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
+ return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
new file mode 100644
index 0000000..c0a930c
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import java.nio.charset.Charset;
+
+public class Constants {
+
+ public static final String COMMA = ",";
+
+ public static final String SLASH = "/";
+
+ public static final Charset UTF8 = Charset.forName("UTF-8");
+
+ public static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
new file mode 100644
index 0000000..a9b8546
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * json serialize or deserialize
+ */
+public class FastJsonSerializer {
+
+ public static <T> byte[] serialize(T obj) {
+ String json = JSON.toJSONString(obj);
+ return json.getBytes(Constants.UTF8);
+ }
+
+ public static <T> String serializeToString(T obj) {
+ return JSON.toJSONString(obj);
+ }
+
+ public static <T> T deserialize(byte[] src, Class<T> clazz) {
+ return JSON.parseObject(new String(src, Constants.UTF8), clazz);
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
new file mode 100644
index 0000000..a79a374
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.utils;
+
+
+public class Pair<L, R> {
+
+ private L left;
+
+ private R right;
+
+ public Pair(L left, R right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public L getLeft() {
+ return left;
+ }
+
+ public void setLeft(L left) {
+ this.left = left;
+ }
+
+ public R getRight() {
+ return right;
+ }
+
+ public void setRight(R right) {
+ this.right = right;
+ }
+}
diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
new file mode 100644
index 0000000..19fd564
--- /dev/null
+++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -0,0 +1,71 @@
+package org.apache.dolphinscheduler.remote;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.Ping;
+import org.apache.dolphinscheduler.remote.command.Pong;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class NettyRemotingClientTest {
+
+
+ @Test
+ public void testSend(){
+ NettyServerConfig serverConfig = new NettyServerConfig();
+
+ NettyRemotingServer server = new NettyRemotingServer(serverConfig);
+ server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
+ @Override
+ public void process(Channel channel, Command command) {
+ channel.writeAndFlush(Pong.create(command.getOpaque()));
+ }
+ });
+ server.start();
+ //
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicLong opaque = new AtomicLong(1);
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient client = new NettyRemotingClient(clientConfig);
+ client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
+ @Override
+ public void process(Channel channel, Command command) {
+ opaque.set(command.getOpaque());
+ latch.countDown();
+ }
+ });
+ Command commandPing = Ping.create();
+ try {
+ client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
+ latch.await();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertEquals(opaque.get(), commandPing.getOpaque());
+ }
+}
diff --git a/dolphinscheduler-rpc/pom.xml b/dolphinscheduler-rpc/pom.xml
deleted file mode 100644
index 680a4a2..0000000
--- a/dolphinscheduler-rpc/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler</artifactId>
- <version>1.2.1-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>dolphinscheduler-rpc</artifactId>
-
- <name>dolphinscheduler-rpc</name>
- <url>https://github.com/apache/incubator-dolphinscheduler</url>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
-
- <protobuf.version>3.5.1</protobuf.version>
- <grpc.version>1.9.0</grpc.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-netty</artifactId>
- <version>${grpc.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf</artifactId>
- <version>${grpc.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-stub</artifactId>
- <version>${grpc.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <extensions>
- <extension>
- <groupId>kr.motd.maven</groupId>
- <artifactId>os-maven-plugin</artifactId>
- <version>1.5.0.Final</version>
- </extension>
- </extensions>
- <plugins>
- <plugin>
- <groupId>org.xolstice.maven.plugins</groupId>
- <artifactId>protobuf-maven-plugin</artifactId>
- <version>0.5.0</version>
- <configuration>
- <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
- <pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
- </configuration>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>compile-custom</id>
- <goals>
- <goal>compile-custom</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/dolphinscheduler-rpc/src/main/proto/scheduler.proto b/dolphinscheduler-rpc/src/main/proto/scheduler.proto
deleted file mode 100644
index b8b595c..0000000
--- a/dolphinscheduler-rpc/src/main/proto/scheduler.proto
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-syntax = "proto3";
-
-package schduler;
-
-option java_multiple_files = true;
-option java_package = "org.apache.dolphinscheduler.rpc";
-option java_outer_classname = "SchdulerProto";
-
-
-/**
- * return str info
- */
-message RetStrInfo {
- /**
- * str msg info
- */
- string msg = 1 ;
-}
-
-/**
- * return byte info
- */
-message RetByteInfo {
- /**
- * byte data info
- */
- bytes data = 1;
-}
-
-/**
- * log parameter
- */
-message LogParameter {
-
- /**
- * path
- */
- string path = 1 ;
-
- /**
- * skip line num
- */
- int32 skipLineNum = 2 ;
-
- /**
- * display limt num
- */
- int32 limit = 3 ;
-}
-
-
-/**
- * path parameter
- */
-message PathParameter {
-
- /**
- * path
- */
- string path = 1 ;
-}
-
-/**
- * log view service
- */
-service LogViewService {
-
- /**
- * roll view log
- */
- rpc rollViewLog(LogParameter) returns (RetStrInfo) {};
-
- /**
- * view all log
- */
- rpc viewLog(PathParameter) returns (RetStrInfo) {};
-
- /**
- * get log bytes
- */
- rpc getLogBytes(PathParameter) returns (RetByteInfo) {};
-}
-
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 751fd91..080b87e 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -71,7 +71,7 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-rpc</artifactId>
+ <artifactId>dolphinscheduler-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
new file mode 100644
index 0000000..4e4404e
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * logger request process logic
+ */
+public class LoggerRequestProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);
+
+ private final ThreadPoolExecutor executor;
+
+ public LoggerRequestProcessor(){
+ this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received command : {}", command);
+
+ /**
+ * reuqest task log command type
+ */
+ final CommandType commandType = command.getType();
+ switch (commandType){
+ case GET_LOG_BYTES_REQUEST:
+ GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesRequestCommand.class);
+ byte[] bytes = getFileContentBytes(getLogRequest.getPath());
+ GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
+ channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case VIEW_WHOLE_LOG_REQUEST:
+ ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), ViewLogRequestCommand.class);
+ String msg = readWholeFileContent(viewLogRequest.getPath());
+ ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
+ channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case ROLL_VIEW_LOG_REQUEST:
+ RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), RollViewLogRequestCommand.class);
+ List<String> lines = readPartFileContent(rollViewLogRequest.getPath(),
+ rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
+ StringBuilder builder = new StringBuilder();
+ for (String line : lines){
+ builder.append(line + "\r\n");
+ }
+ RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
+ channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
+ break;
+ default:
+ throw new IllegalArgumentException("unknown commandType");
+ }
+ }
+
+ public ExecutorService getExecutor(){
+ return this.executor;
+ }
+
+ /**
+ * get files content bytes,for down load file
+ *
+ * @param filePath file path
+ * @return byte array of file
+ * @throws Exception exception
+ */
+ private byte[] getFileContentBytes(String filePath){
+ InputStream in = null;
+ ByteArrayOutputStream bos = null;
+ try {
+ in = new FileInputStream(filePath);
+ bos = new ByteArrayOutputStream();
+ byte[] buf = new byte[1024];
+ int len;
+ while ((len = in.read(buf)) != -1) {
+ bos.write(buf, 0, len);
+ }
+ return bos.toByteArray();
+ }catch (IOException e){
+ logger.error("get file bytes error",e);
+ }finally {
+ if (bos != null){
+ try {
+ bos.close();
+ } catch (IOException ignore) {}
+ }
+ if (in != null){
+ try {
+ in.close();
+ } catch (IOException ignore) {}
+ }
+ }
+ return new byte[0];
+ }
+
+ /**
+ * read part file content,can skip any line and read some lines
+ *
+ * @param filePath file path
+ * @param skipLine skip line
+ * @param limit read lines limit
+ * @return part file content
+ */
+ private List<String> readPartFileContent(String filePath,
+ int skipLine,
+ int limit){
+ try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
+ return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
+ } catch (IOException e) {
+ logger.error("read file error",e);
+ }
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * read whole file content
+ *
+ * @param filePath file path
+ * @return whole file content
+ */
+ private String readWholeFileContent(String filePath){
+ BufferedReader br = null;
+ String line;
+ StringBuilder sb = new StringBuilder();
+ try {
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
+ while ((line = br.readLine()) != null){
+ sb.append(line + "\r\n");
+ }
+ return sb.toString();
+ }catch (IOException e){
+ logger.error("read file error",e);
+ }finally {
+ try {
+ if (br != null){
+ br.close();
+ }
+ } catch (IOException ignore) {}
+ }
+ return "";
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
new file mode 100644
index 0000000..3520fb0
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.log;
+
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * logger server
+ */
+public class LoggerServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
+
+ /**
+ * netty server
+ */
+ private final NettyRemotingServer server;
+
+ /**
+ * netty server config
+ */
+ private final NettyServerConfig serverConfig;
+
+ /**
+ * loggger request processor
+ */
+ private final LoggerRequestProcessor requestProcessor;
+
+ public LoggerServer(){
+ this.serverConfig = new NettyServerConfig();
+ this.serverConfig.setListenPort(Constants.RPC_PORT);
+ this.server = new NettyRemotingServer(serverConfig);
+ this.requestProcessor = new LoggerRequestProcessor();
+ this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ }
+
+ /**
+ * main launches the server from the command line.
+ * @param args arguments
+ */
+ public static void main(String[] args) {
+ final LoggerServer server = new LoggerServer();
+ server.start();
+ }
+
+ /**
+ * server start
+ */
+ public void start() {
+ this.server.start();
+ logger.info("logger server started, listening on port : {}" , Constants.RPC_PORT);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ LoggerServer.this.stop();
+ }
+ });
+ }
+
+ /**
+ * stop
+ */
+ public void stop() {
+ this.server.close();
+ logger.info("logger server shut down");
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
deleted file mode 100644
index 1c6c97b..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * log client
- */
-public class LogClient {
-
- /**
- * logger of LogClient
- */
- private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
-
- /**
- * managed channel
- */
- private final ManagedChannel channel;
-
- /**
- * blocking stub
- */
- private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
-
- /**
- * Construct client connecting to HelloWorld server at host:port.
- *
- * @param host host
- * @param port port
- */
- public LogClient(String host, int port) {
- this(ManagedChannelBuilder.forAddress(host, port)
- // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
- // needing certificates.
- .usePlaintext(true));
- }
-
- /**
- * Construct client for accessing RouteGuide server using the existing channel.
- *
- * @param channelBuilder channel builder
- */
- LogClient(ManagedChannelBuilder<?> channelBuilder) {
- /**
- * set max message read size
- */
- channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
- channel = channelBuilder.build();
- blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
- }
-
- /**
- * shut down channel
- *
- * @throws InterruptedException interrupted exception
- */
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /**
- * roll view log
- *
- * @param path log path
- * @param skipLineNum skip line num
- * @param limit limit
- * @return log content
- */
- public String rollViewLog(String path,int skipLineNum,int limit) {
- logger.info("roll view log , path : {},skipLineNum : {} ,limit :{}", path, skipLineNum, limit);
- LogParameter pathParameter = LogParameter
- .newBuilder()
- .setPath(path)
- .setSkipLineNum(skipLineNum)
- .setLimit(limit)
- .build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.rollViewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("roll view log failed", e);
- return null;
- }
- }
-
- /**
- * view all log
- *
- * @param path log path
- * @return log content
- */
- public String viewLog(String path) {
- logger.info("view log path : {}",path);
-
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.viewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("view log failed", e);
- return null;
- }
- }
-
- /**
- * get log bytes
- *
- * @param path log path
- * @return log content
- */
- public byte[] getLogBytes(String path) {
- logger.info("get log bytes {}",path);
-
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetByteInfo retByteInfo;
- try {
- retByteInfo = blockingStub.getLogBytes(pathParameter);
- return retByteInfo.getData().toByteArray();
- } catch (StatusRuntimeException e) {
- logger.error("get log bytes failed ", e);
- return null;
- }
- }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
deleted file mode 100644
index 5ec5df9..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.dolphinscheduler.common.Constants;
-import com.google.protobuf.ByteString;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * logger server
- */
-public class LoggerServer {
-
- private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
-
- /**
- * server
- */
- private Server server;
-
- /**
- * server start
- * @throws IOException io exception
- */
- public void start() throws IOException {
- /* The port on which the server should run */
- int port = Constants.RPC_PORT;
- server = ServerBuilder.forPort(port)
- .addService(new LogViewServiceGrpcImpl())
- .build()
- .start();
- logger.info("server started, listening on port : {}" , port);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- logger.info("shutting down gRPC server since JVM is shutting down");
- LoggerServer.this.stop();
- logger.info("server shut down");
- }
- });
- }
-
- /**
- * stop
- */
- private void stop() {
- if (server != null) {
- server.shutdown();
- }
- }
-
- /**
- * await termination on the main thread since the grpc library uses daemon threads.
- */
- private void blockUntilShutdown() throws InterruptedException {
- if (server != null) {
- server.awaitTermination();
- }
- }
-
- /**
- * main launches the server from the command line.
- */
-
- /**
- * main launches the server from the command line.
- * @param args arguments
- * @throws IOException io exception
- * @throws InterruptedException interrupted exception
- */
- public static void main(String[] args) throws IOException, InterruptedException {
- final LoggerServer server = new LoggerServer();
- server.start();
- server.blockUntilShutdown();
- }
-
- /**
- * Log View Service Grpc Implementation
- */
- static class LogViewServiceGrpcImpl extends LogViewServiceGrpc.LogViewServiceImplBase {
- @Override
- public void rollViewLog(LogParameter request, StreamObserver<RetStrInfo> responseObserver) {
-
- logger.info("log parameter path : {} ,skip line : {}, limit : {}",
- request.getPath(),
- request.getSkipLineNum(),
- request.getLimit());
- List<String> list = readFile(request.getPath(), request.getSkipLineNum(), request.getLimit());
- StringBuilder sb = new StringBuilder();
- boolean errorLineFlag = false;
- for (String line : list){
- sb.append(line + "\r\n");
- }
- RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build();
- responseObserver.onNext(retInfoBuild);
- responseObserver.onCompleted();
- }
-
- @Override
- public void viewLog(PathParameter request, StreamObserver<RetStrInfo> responseObserver) {
- logger.info("task path is : {} " , request.getPath());
- RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(readFile(request.getPath())).build();
- responseObserver.onNext(retInfoBuild);
- responseObserver.onCompleted();
- }
-
- @Override
- public void getLogBytes(PathParameter request, StreamObserver<RetByteInfo> responseObserver) {
- try {
- ByteString bytes = ByteString.copyFrom(getFileBytes(request.getPath()));
- RetByteInfo.Builder builder = RetByteInfo.newBuilder();
- builder.setData(bytes);
- responseObserver.onNext(builder.build());
- responseObserver.onCompleted();
- }catch (Exception e){
- logger.error("get log bytes failed",e);
- }
- }
- }
-
- /**
- * get files bytes
- *
- * @param path path
- * @return byte array of file
- * @throws Exception exception
- */
- private static byte[] getFileBytes(String path){
- InputStream in = null;
- ByteArrayOutputStream bos = null;
- try {
- in = new FileInputStream(path);
- bos = new ByteArrayOutputStream();
- byte[] buf = new byte[1024];
- int len = 0;
- while ((len = in.read(buf)) != -1) {
- bos.write(buf, 0, len);
- }
- return bos.toByteArray();
- }catch (IOException e){
- logger.error("get file bytes error",e);
- }finally {
- if (bos != null){
- try {
- bos.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (in != null){
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
- /**
- * read file content
- *
- * @param path
- * @param skipLine
- * @param limit
- * @return
- */
- private static List<String> readFile(String path,int skipLine,int limit){
- try (Stream<String> stream = Files.lines(Paths.get(path))) {
- return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
- } catch (IOException e) {
- logger.error("read file failed",e);
- }
- return null;
- }
-
- /**
- * read file content
- *
- * @param path path
- * @return string of file content
- * @throws Exception exception
- */
- private static String readFile(String path){
- BufferedReader br = null;
- String line = null;
- StringBuilder sb = new StringBuilder();
- try {
- br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
- boolean errorLineFlag = false;
- while ((line = br.readLine()) != null){
- sb.append(line + "\r\n");
- }
-
- return sb.toString();
- }catch (IOException e){
- logger.error("read file failed",e);
- }finally {
- try {
- if (br != null){
- br.close();
- }
- } catch (IOException e) {
- logger.error(e.getMessage(),e);
- }
- }
- return null;
- }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index fd0a08c..69284ee 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.rpc.LogClient;
import org.apache.commons.io.FileUtils;
+import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -375,7 +375,7 @@ public class ProcessUtils {
public static void killYarnJob(TaskInstance taskInstance) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT);
+ LogClientService logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
String log = logClient.viewLog(taskInstance.getLogPath());
if (StringUtils.isNotEmpty(log)) {
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
new file mode 100644
index 0000000..31a2837
--- /dev/null
+++ b/dolphinscheduler-service/pom.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dolphinscheduler</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <version>1.2.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dolphinscheduler-service</artifactId>
+
+ <name>dolphinscheduler-service</name>
+ <url>http://www.example.com</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.7</maven.compiler.source>
+ <maven.compiler.target>1.7</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-remote</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java
new file mode 100644
index 0000000..7607159
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * view log response command
+ */
+public class MasterResponseCommand implements Serializable {
+
+ private String msg;
+
+ public MasterResponseCommand() {
+ }
+
+ public MasterResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.MASTER_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java
new file mode 100644
index 0000000..419add4
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * view log request command
+ */
+public class WorkerRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public WorkerRequestCommand() {
+ }
+
+ public WorkerRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.WORKER_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
new file mode 100644
index 0000000..a316c70
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * log client
+ */
+public class LogClientService implements NettyRequestProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
+
+ private final NettyClientConfig clientConfig;
+
+ private final NettyRemotingClient client;
+
+ private final Address address;
+
+ /**
+ * request time out
+ */
+ private final long logRequestTimeout = 10 * 1000;
+
+ /**
+ * construct client
+ * @param host host
+ * @param port port
+ */
+ public LogClientService(String host, int port) {
+ this.address = new Address(host, port);
+ this.clientConfig = new NettyClientConfig();
+ this.clientConfig.setWorkerThreads(1);
+ this.client = new NettyRemotingClient(clientConfig);
+ this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
+ this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
+ this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
+
+ }
+
+ /**
+ * shutdown
+ */
+ public void shutdown() {
+ this.client.close();
+ logger.info("logger client shutdown");
+ }
+
+ /**
+ * roll view log
+ * @param path path
+ * @param skipLineNum skip line number
+ * @param limit limit
+ * @return log content
+ */
+ public String rollViewLog(String path,int skipLineNum,int limit) {
+ logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
+ RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ logger.error("roll view log error", e);
+ }
+ return result;
+ }
+
+ /**
+ * view log
+ * @param path path
+ * @return log content
+ */
+ public String viewLog(String path) {
+ logger.info("view log path {}", path);
+ ViewLogRequestCommand request = new ViewLogRequestCommand(path);
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ logger.error("view log error", e);
+ }
+ return result;
+ }
+
+ /**
+ * get log size
+ * @param path log path
+ * @return log content bytes
+ */
+ public byte[] getLogBytes(String path) {
+ logger.info("log path {}", path);
+ GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
+ byte[] result = null;
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = (byte[])promise.getResult();
+ } catch (Exception e) {
+ logger.error("get log size error", e);
+ }
+ return result;
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received log response : {}", command);
+ switch (command.getType()){
+ case ROLL_VIEW_LOG_RESPONSE:
+ RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
+ command.getBody(), RollViewLogResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
+ break;
+ case VIEW_WHOLE_LOG_RESPONSE:
+ ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
+ command.getBody(), ViewLogResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), viewLog.getMsg());
+ break;
+ case GET_LOG_BYTES_RESPONSE:
+ GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), getLog.getData());
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
+ }
+ }
+
+ public static void main(String[] args) throws Exception{
+ LogClientService logClient = new LogClientService("192.168.220.247", 50051);
+ byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
+ System.out.println(new String(logBytes));
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
new file mode 100644
index 0000000..8920b8a
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.log;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * log asyc callback
+ */
+public class LogPromise {
+
+ private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
+
+ private long opaque;
+
+ private final long start;
+
+ private final long timeout;
+
+ private final CountDownLatch latch;
+
+ private Object result;
+
+ public LogPromise(long opaque, long timeout){
+ this.opaque = opaque;
+ this.timeout = timeout;
+ this.start = System.currentTimeMillis();
+ this.latch = new CountDownLatch(1);
+ PROMISES.put(opaque, this);
+ }
+
+
+ /**
+ * notify client finish
+ * @param opaque unique identification
+ * @param result result
+ */
+ public static void notify(long opaque, Object result){
+ LogPromise promise = PROMISES.remove(opaque);
+ if(promise != null){
+ promise.doCountDown(result);
+ }
+ }
+
+ private void doCountDown(Object result){
+ this.result = result;
+ this.latch.countDown();
+ }
+
+ public boolean isTimeout(){
+ return System.currentTimeMillis() - start > timeout;
+ }
+
+ public Object getResult(){
+ try {
+ latch.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignore) {
+ }
+ PROMISES.remove(opaque);
+ return this.result;
+ }
+
+
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java
new file mode 100644
index 0000000..c107122
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.worker;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.service.MasterResponseCommand;
+import org.apache.dolphinscheduler.service.WorkerRequestCommand;
+import org.apache.dolphinscheduler.service.log.LogPromise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * log client
+ */
+public class WorkerClientService implements NettyRequestProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkerClientService.class);
+
+ private final NettyClientConfig clientConfig;
+
+ private final NettyRemotingClient client;
+
+ private final Address address;
+
+ /**
+ * request time out
+ */
+ private final long logRequestTimeout = 10 * 1000;
+
+ /**
+ * construct client
+ * @param host host
+ * @param port port
+ */
+ public WorkerClientService(String host, int port) {
+ this.address = new Address(host, port);
+ this.clientConfig = new NettyClientConfig();
+ this.clientConfig.setWorkerThreads(1);
+ this.client = new NettyRemotingClient(clientConfig);
+ this.client.registerProcessor(CommandType.MASTER_RESPONSE, this);
+
+ }
+
+ /**
+ * shutdown
+ */
+ public void shutdown() {
+ this.client.close();
+ logger.info("logger client shutdown");
+ }
+
+
+ public String reportResult() {
+ WorkerRequestCommand request = new WorkerRequestCommand();
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("roll view log error", e);
+ }
+ return result;
+ }
+
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received log response : {}", command);
+ MasterResponseCommand masterResponseCommand = FastJsonSerializer.deserialize(
+ command.getBody(), MasterResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), masterResponseCommand.getMsg());
+ }
+
+ public static void main(String[] args) throws Exception{
+ WorkerClientService workerClientService = new WorkerClientService("192.168.220.247", 1128);
+ String result = workerClientService.reportResult();
+ System.out.println(result);
+
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 875577c..f6009dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -229,7 +229,12 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-rpc</artifactId>
+ <artifactId>dolphinscheduler-remote</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-service</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -770,9 +775,6 @@
<exclude>**/dolphinscheduler-ui/src/view/common/outro.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/view/common/meta.inc</exclude>
<exclude>**/dolphinscheduler-ui/src/combo/1.0.0/3rd.css</exclude>
- <exclude>
- **/dolphinscheduler-rpc/src/main/java/org/apache/dolphinscheduler/rpc/LogViewServiceGrpc.java
- </exclude>
</excludes>
<consoleOutput>true</consoleOutput>
</configuration>
@@ -859,8 +861,9 @@
<module>dolphinscheduler-api</module>
<module>dolphinscheduler-dao</module>
<module>dolphinscheduler-alert</module>
- <module>dolphinscheduler-rpc</module>
<module>dolphinscheduler-dist</module>
+ <module>dolphinscheduler-remote</module>
+ <module>dolphinscheduler-service</module>
</modules>
</project>