You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/04 03:45:00 UTC

[incubator-dolphinscheduler] branch refactor-logger updated: refactor Logger (#1887)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-logger
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-logger by this push:
     new ccd3194  refactor Logger (#1887)
ccd3194 is described below

commit ccd3194af967b6942aaf38ec6ff6b2343f91a3cd
Author: Tboy <gu...@immomo.com>
AuthorDate: Tue Feb 4 11:44:51 2020 +0800

    refactor Logger (#1887)
    
    * move version to parent pom
    
    * move version properties to parent pom for easy management
    
    * remove freemarker dependency
    
    * add remote module
    
    * add license and remove author
    
    * rewrite LoggerServer and LoggerClient, remove rpc module
    
    * remove unused class
    
    * updates
    
    * updates for log part
    
    * updates
    
    * remove author
    
    * updates
    
    * add licence and remove author
    
    * add service module
    
    * add log class
    
    * add ping/pong and test case for client
---
 dolphinscheduler-api/pom.xml                       |   2 +-
 .../dolphinscheduler/api/ApiApplicationServer.java |   1 +
 .../api/CombinedApplicationServer.java             |  49 -----
 .../apache/dolphinscheduler/api/log/LogClient.java | 137 ------------
 .../api/service/LoggerService.java                 |   6 +-
 .../api/service/ResourcesService.java              |  10 +-
 .../api/service/UdfFuncService.java                |   2 +-
 dolphinscheduler-remote/pom.xml                    |  34 +++
 .../remote/NettyRemotingClient.java                | 190 ++++++++++++++++
 .../remote/NettyRemotingServer.java                | 162 ++++++++++++++
 .../remote/codec/NettyDecoder.java                 |  89 ++++++++
 .../remote/codec/NettyEncoder.java                 |  34 +--
 .../dolphinscheduler/remote/command/Command.java   |  89 ++++++++
 .../remote/command/CommandHeader.java              |  44 ++--
 .../remote/command/CommandType.java                |   1 +
 .../remote/command/ExecuteTaskRequestCommand.java  |   1 +
 .../remote/command/ExecuteTaskResponseCommand.java |   1 +
 .../dolphinscheduler/remote/command/Ping.java      |  57 +++++
 .../dolphinscheduler/remote/command/Pong.java      |  54 +++++
 .../remote/command/log/GetLogRequestCommand.java   |  55 +++++
 .../remote/command/log/GetLogResponseCommand.java  |  53 +++++
 .../command/log/RollViewLogRequestCommand.java     |  77 +++++++
 .../command/log/RollViewLogResponseCommand.java    |  52 +++++
 .../remote/command/log/ViewLogRequestCommand.java  |  55 +++++
 .../remote/command/log/ViewLogResponseCommand.java |  52 +++++
 .../remote/config/NettyClientConfig.java           |  73 +++++++
 .../remote/config/NettyServerConfig.java           |  92 ++++++++
 .../remote/exceptions/RemotingException.java       |  91 ++++++++
 .../remote/handler/NettyClientHandler.java         | 120 +++++++++++
 .../remote/handler/NettyServerHandler.java         | 118 ++++++++++
 .../remote/processor/NettyRequestProcessor.java    |  21 +-
 .../dolphinscheduler/remote/utils/Address.java     |  84 ++++++++
 .../remote/utils/ChannelUtils.java                 |  30 +--
 .../dolphinscheduler/remote/utils/Constants.java   |  23 +-
 .../remote/utils/FastJsonSerializer.java           |  28 +--
 .../apache/dolphinscheduler/remote/utils/Pair.java |  39 ++--
 .../src/test/java/NettyRemotingClientTest.java     |  71 ++++++
 dolphinscheduler-rpc/pom.xml                       | 113 ----------
 .../src/main/proto/scheduler.proto                 | 101 ---------
 dolphinscheduler-server/pom.xml                    |   8 +-
 .../server/log/LoggerRequestProcessor.java         | 168 +++++++++++++++
 .../dolphinscheduler/server/log/LoggerServer.java  |  79 +++++++
 .../dolphinscheduler/server/rpc/LogClient.java     | 149 -------------
 .../dolphinscheduler/server/rpc/LoggerServer.java  | 238 ---------------------
 .../server/utils/ProcessUtils.java                 |   4 +-
 dolphinscheduler-service/pom.xml                   |  20 ++
 .../service/log/LogClientService.java              | 153 +++++++++++++
 .../dolphinscheduler/service/log/LogPromise.java   |  74 +++++++
 pom.xml                                            |  72 ++++---
 49 files changed, 2340 insertions(+), 936 deletions(-)

diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml
index ae28a48..e185cb2 100644
--- a/dolphinscheduler-api/pom.xml
+++ b/dolphinscheduler-api/pom.xml
@@ -153,7 +153,7 @@
 
     <dependency>
       <groupId>org.apache.dolphinscheduler</groupId>
-      <artifactId>dolphinscheduler-rpc</artifactId>
+      <artifactId>dolphinscheduler-service</artifactId>
     </dependency>
 
     <dependency>
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 8376c28..a0947f7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -29,6 +29,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
 public class ApiApplicationServer extends SpringBootServletInitializer {
 
   public static void main(String[] args) {
+
     SpringApplication.run(ApiApplicationServer.class, args);
   }
 
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java
deleted file mode 100644
index 5030890..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/CombinedApplicationServer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api;
-
-import org.apache.dolphinscheduler.alert.AlertServer;
-import org.apache.dolphinscheduler.server.master.MasterServer;
-import org.apache.dolphinscheduler.server.rpc.LoggerServer;
-import org.apache.dolphinscheduler.server.worker.WorkerServer;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Import;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
-
-@SpringBootApplication
-@ConditionalOnProperty(prefix = "server", name = "is-combined-server", havingValue = "true")
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-@Import({MasterServer.class, WorkerServer.class})
-@EnableSwagger2
-public class CombinedApplicationServer extends SpringBootServletInitializer {
-
-    public static void main(String[] args) throws Exception {
-
-        ApiApplicationServer.main(args);
-
-        LoggerServer server = new LoggerServer();
-        server.start();
-
-        AlertServer alertServer = AlertServer.getInstance();
-        alertServer.start();
-    }
-}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java
deleted file mode 100644
index 3452060..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/log/LogClient.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.log;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * log client
- */
-public class LogClient {
-
-    private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
-
-    private final ManagedChannel channel;
-    private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
-
-    /**
-     * construct client connecting to HelloWorld server at {@code host:port}
-     *
-     * @param host host
-     * @param port port
-     */
-    public LogClient(String host, int port) {
-        this(ManagedChannelBuilder.forAddress(host, port)
-                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
-                // needing certificates.
-                .usePlaintext(true));
-    }
-
-    /**
-     * construct client for accessing RouteGuide server using the existing channel
-     *
-     */
-    LogClient(ManagedChannelBuilder<?> channelBuilder) {
-        /**
-         * set max read size
-         */
-        channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
-        channel = channelBuilder.build();
-        blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
-    }
-
-    /**
-     * shutdown
-     *
-     * @throws InterruptedException InterruptedException
-     */
-    public void shutdown() throws InterruptedException {
-        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
-    }
-
-    /**
-     * roll view log
-     *
-     * @param path path
-     * @param skipLineNum skip line number
-     * @param limit limit
-     * @return log content
-     */
-    public String rollViewLog(String path,int skipLineNum,int limit) {
-        logger.info("roll view log : path {},skipLineNum {} ,limit {}", path, skipLineNum, limit);
-        LogParameter pathParameter = LogParameter
-                .newBuilder()
-                .setPath(path)
-                .setSkipLineNum(skipLineNum)
-                .setLimit(limit)
-                .build();
-        RetStrInfo retStrInfo;
-        try {
-            retStrInfo = blockingStub.rollViewLog(pathParameter);
-            return retStrInfo.getMsg();
-        } catch (StatusRuntimeException e) {
-            logger.error("roll view log error", e);
-            return null;
-        }
-    }
-
-    /**
-     * view log
-     *
-     * @param path path
-     * @return log content
-     */
-    public String viewLog(String path) {
-        logger.info("view log path {}",path);
-        PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
-        RetStrInfo retStrInfo;
-        try {
-            retStrInfo = blockingStub.viewLog(pathParameter);
-            return retStrInfo.getMsg();
-        } catch (StatusRuntimeException e) {
-            logger.error("view log error", e);
-            return null;
-        }
-    }
-
-    /**
-     * get log size
-     *
-     * @param path log path
-     * @return log content bytes
-     */
-    public byte[] getLogBytes(String path) {
-        logger.info("log path {}",path);
-        PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
-        RetByteInfo retByteInfo;
-        try {
-            retByteInfo = blockingStub.getLogBytes(pathParameter);
-            return retByteInfo.getData().toByteArray();
-        } catch (StatusRuntimeException e) {
-            logger.error("log size error", e);
-            return null;
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index 61dc1a7..108d5d4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -17,12 +17,12 @@
 package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.log.LogClient;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.ProcessDao;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -65,7 +65,7 @@ public class LoggerService {
 
     logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
 
-    LogClient logClient = new LogClient(host, Constants.RPC_PORT);
+    LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
     String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
     result.setData(log);
     logger.info(log);
@@ -85,7 +85,7 @@ public class LoggerService {
       throw new RuntimeException("task instance is null");
     }
     String host = taskInstance.getHost();
-    LogClient logClient = new LogClient(host, Constants.RPC_PORT);
+    LogClientService logClient = new LogClientService(host, Constants.RPC_PORT);
     return logClient.getLogBytes(taskInstance.getLogPath());
   }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index 3093dae..09b1d31 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.api.service;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import org.apache.commons.collections.BeanMap;
-import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -28,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
@@ -104,7 +104,7 @@ public class ResourcesService extends BaseService {
         String nameSuffix = FileUtils.suffix(name);
 
         // determine file suffix
-        if (!StringUtils.equals(fileSuffix, nameSuffix)) {
+        if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
             /**
              * rename file suffix and original suffix must be consistent
              */
@@ -341,7 +341,7 @@ public class ResourcesService extends BaseService {
         String nameSuffix = FileUtils.suffix(name);
 
         // determine file suffix
-        if (!StringUtils.equals(fileSuffix, nameSuffix)) {
+        if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) {
             return false;
         }
         // query tenant
@@ -539,7 +539,7 @@ public class ResourcesService extends BaseService {
                 putMsg(result, Status.SUCCESS);
                 Map<String, Object> map = new HashMap<>();
                 map.put(ALIAS, resource.getAlias());
-                map.put(CONTENT, StringUtils.join(content.toArray(), "\n"));
+                map.put(CONTENT, StringUtils.join(content, "\n"));
                 result.setData(map);
             }else{
                 logger.error("read file {} not exist in hdfs", hdfsFileName);
@@ -602,7 +602,7 @@ public class ResourcesService extends BaseService {
 
         putMsg(result, Status.SUCCESS);
         Map<Object, Object> dataMap = new BeanMap(resource);
-        Map<String, Object> resultMap = new HashMap<>(5);
+        Map<String, Object> resultMap = new HashMap<>();
         for (Map.Entry<Object, Object> entry: dataMap.entrySet()) {
             if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
                 resultMap.put(entry.getKey().toString(), entry.getValue());
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
index 2032492..249c7ec 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.UdfType;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.dao.entity.User;
@@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml
new file mode 100644
index 0000000..858f40a
--- /dev/null
+++ b/dolphinscheduler-remote/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>1.2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-remote</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
new file mode 100644
index 0000000..dda78f3
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
+import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NettyRemotingClient {
+
+    private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
+
+    private final Bootstrap bootstrap = new Bootstrap();
+
+    private final NettyEncoder encoder = new NettyEncoder();
+
+    private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
+
+    private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    private final NioEventLoopGroup workerGroup;
+
+    private final NettyClientHandler clientHandler = new NettyClientHandler(this);
+
+    private final NettyClientConfig clientConfig;
+
+    public NettyRemotingClient(final NettyClientConfig clientConfig){
+        this.clientConfig = clientConfig;
+        this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            public Thread newThread(Runnable r) {
+                return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
+            }
+        });
+        this.start();
+    }
+
+    private void start(){
+
+        this.bootstrap
+                .group(this.workerGroup)
+                .channel(NioSocketChannel.class)
+                .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
+                .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
+                .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
+                .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    public void initChannel(SocketChannel ch) throws Exception {
+                        ch.pipeline().addLast(
+                                new NettyDecoder(),
+                                clientHandler,
+                                encoder);
+                    }
+                });
+        //
+        isStarted.compareAndSet(false, true);
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+        registerProcessor(commandType, processor, null);
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+        this.clientHandler.registerProcessor(commandType, processor, executor);
+    }
+
+    public void send(final Address address, final Command command) throws RemotingException {
+        final Channel channel = getChannel(address);
+        if (channel == null) {
+            throw new RemotingException("network error");
+        }
+        try {
+            channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if(future.isSuccess()){
+                        logger.info("sent command {} to {}", command, address);
+                    } else{
+                        logger.error("send command {} to {} failed, error {}", command, address, future.cause());
+                    }
+                }
+            });
+        } catch (Exception ex) {
+            String msg = String.format("send command %s to address %s encounter error", command, address);
+            throw new RemotingException(msg, ex);
+        }
+    }
+
+    public Channel getChannel(Address address) {
+        Channel channel = channels.get(address);
+        if(channel != null && channel.isActive()){
+            return channel;
+        }
+        return createChannel(address, true);
+    }
+
+    public Channel createChannel(Address address, boolean isSync) {
+        ChannelFuture future;
+        try {
+            synchronized (bootstrap){
+                future = bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort()));
+            }
+            if(isSync){
+                future.sync();
+            }
+            if (future.isSuccess()) {
+                Channel channel = future.channel();
+                channels.put(address, channel);
+                return channel;
+            }
+        } catch (Exception ex) {
+            logger.info("connect to {} error  {}", address, ex);
+        }
+        return null;
+    }
+
+    public ExecutorService getDefaultExecutor() {
+        return defaultExecutor;
+    }
+
+    public void close() {
+        if(isStarted.compareAndSet(true, false)){
+            try {
+                closeChannels();
+                if(workerGroup != null){
+                    this.workerGroup.shutdownGracefully();
+                }
+                if(defaultExecutor != null){
+                    defaultExecutor.shutdown();
+                }
+            } catch (Exception ex) {
+                logger.error("netty client close exception", ex);
+            }
+            logger.info("netty client closed");
+        }
+    }
+
+    private void closeChannels(){
+        for (Channel channel : this.channels.values()) {
+            channel.close();
+        }
+        this.channels.clear();
+    }
+
+    public void removeChannel(Address address){
+        Channel channel = this.channels.remove(address);
+        if(channel != null){
+            channel.close();
+        }
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
new file mode 100644
index 0000000..7fd7331
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
+import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class NettyRemotingServer {
+
+    private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
+
+    private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+
+    private final NettyEncoder encoder = new NettyEncoder();
+
+    private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
+
+    private final NioEventLoopGroup bossGroup;
+
+    private final NioEventLoopGroup workGroup;
+
+    private final NettyServerConfig serverConfig;
+
+    private final NettyServerHandler serverHandler = new NettyServerHandler(this);
+
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    public NettyRemotingServer(final NettyServerConfig serverConfig){
+        this.serverConfig = serverConfig;
+
+        this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            public Thread newThread(Runnable r) {
+                return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
+            }
+        });
+
+        this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+            public Thread newThread(Runnable r) {
+                return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
+            }
+        });
+    }
+
+    public void start(){
+
+        if(this.isStarted.get()){
+            return;
+        }
+
+        this.serverBootstrap
+                .group(this.bossGroup, this.workGroup)
+                .channel(NioServerSocketChannel.class)
+                .option(ChannelOption.SO_REUSEADDR, true)
+                .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
+                .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
+                .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
+                .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
+                .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
+                .childHandler(new ChannelInitializer<NioSocketChannel>() {
+
+                    protected void initChannel(NioSocketChannel ch) throws Exception {
+                        initNettyChannel(ch);
+                    }
+                });
+
+        ChannelFuture future;
+        try {
+            future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
+        } catch (Exception e) {
+            logger.error("NettyRemotingServer bind fail {}, exit", e);
+            throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
+        }
+        if (future.isSuccess()) {
+            logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
+        } else if (future.cause() != null) {
+            throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
+        } else {
+            throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
+        }
+        //
+        isStarted.compareAndSet(false, true);
+    }
+
+    private void initNettyChannel(NioSocketChannel ch) throws Exception{
+        ChannelPipeline pipeline = ch.pipeline();
+        pipeline.addLast("encoder", encoder);
+        pipeline.addLast("decoder", new NettyDecoder());
+        pipeline.addLast("handler", serverHandler);
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+        this.registerProcessor(commandType, processor, null);
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+        this.serverHandler.registerProcessor(commandType, processor, executor);
+    }
+
+    public ExecutorService getDefaultExecutor() {
+        return defaultExecutor;
+    }
+
+    public void close() {
+        if(isStarted.compareAndSet(true, false)){
+            try {
+                if(bossGroup != null){
+                    this.bossGroup.shutdownGracefully();
+                }
+                if(workGroup != null){
+                    this.workGroup.shutdownGracefully();
+                }
+                if(defaultExecutor != null){
+                    defaultExecutor.shutdown();
+                }
+            } catch (Exception ex) {
+                logger.error("netty server close exception", ex);
+            }
+            logger.info("netty server closed");
+        }
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
new file mode 100644
index 0000000..dc37334
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.codec;
+
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandHeader;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.util.List;
+
+public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
+
+    public NettyDecoder(){
+        super(State.MAGIC);
+    }
+
+    private final CommandHeader commandHeader = new CommandHeader();
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        switch (state()){
+            case MAGIC:
+                checkMagic(in.readByte());
+                checkpoint(State.COMMAND);
+            case COMMAND:
+                commandHeader.setType(in.readByte());
+                checkpoint(State.OPAQUE);
+            case OPAQUE:
+                commandHeader.setOpaque(in.readLong());
+                checkpoint(State.BODY_LENGTH);
+            case BODY_LENGTH:
+                commandHeader.setBodyLength(in.readInt());
+                checkpoint(State.BODY);
+            case BODY:
+                byte[] body = new byte[commandHeader.getBodyLength()];
+                in.readBytes(body);
+                //
+                Command packet = new Command();
+                packet.setType(commandType(commandHeader.getType()));
+                packet.setOpaque(commandHeader.getOpaque());
+                packet.setBody(body);
+                out.add(packet);
+                //
+                checkpoint(State.MAGIC);
+        }
+    }
+
+    private CommandType commandType(byte type){
+        for(CommandType ct : CommandType.values()){
+            if(ct.ordinal() == type){
+                return ct;
+            }
+        }
+        return null;
+    }
+
+    private void checkMagic(byte magic) {
+        if (magic != Command.MAGIC) {
+            throw new IllegalArgumentException("illegal packet [magic]" + magic);
+        }
+    }
+
+    enum State{
+        MAGIC,
+        COMMAND,
+        OPAQUE,
+        BODY_LENGTH,
+        BODY;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
similarity index 51%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
index 8376c28..fb5b36a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
@@ -14,23 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.codec;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.dolphinscheduler.remote.command.Command;
 
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
-
-  public static void main(String[] args) {
-    SpringApplication.run(ApiApplicationServer.class, args);
-  }
+@Sharable
+public class NettyEncoder extends MessageToByteEncoder<Command> {
 
+    protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
+        if(msg == null){
+            throw new Exception("encode msg is null");
+        }
+        out.writeByte(Command.MAGIC);
+        out.writeByte(msg.getType().ordinal());
+        out.writeLong(msg.getOpaque());
+        out.writeInt(msg.getBody().length);
+        out.writeBytes(msg.getBody());
+    }
 
 }
+
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
new file mode 100644
index 0000000..3f0a394
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+
+public class Command implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final byte MAGIC = (byte) 0xbabe;
+
+    public Command(){
+    }
+
+    public Command(long opaque){
+        this.opaque = opaque;
+    }
+
+    private CommandType type;
+
+    private long opaque;
+
+    private byte[] body;
+
+    public CommandType getType() {
+        return type;
+    }
+
+    public void setType(CommandType type) {
+        this.type = type;
+    }
+
+    public long getOpaque() {
+        return opaque;
+    }
+
+    public void setOpaque(long opaque) {
+        this.opaque = opaque;
+    }
+
+    public byte[] getBody() {
+        return body;
+    }
+
+    public void setBody(byte[] body) {
+        this.body = body;
+    }
+
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) (opaque ^ (opaque >>> 32));
+        return result;
+    }
+
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Command other = (Command) obj;
+        if (opaque != other.opaque)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "Command [type=" + type + ", opaque=" + opaque + ", bodyLen=" + (body == null ? 0 : body.length) + "]";
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
index 8376c28..ac51d01 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -14,23 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.command;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import java.io.Serializable;
 
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+public class CommandHeader implements Serializable {
 
-  public static void main(String[] args) {
-    SpringApplication.run(ApiApplicationServer.class, args);
-  }
+    private byte type;
 
+    private long opaque;
 
+    private int bodyLength;
+
+    public int getBodyLength() {
+        return bodyLength;
+    }
+
+    public void setBodyLength(int bodyLength) {
+        this.bodyLength = bodyLength;
+    }
+
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    public long getOpaque() {
+        return opaque;
+    }
+
+    public void setOpaque(long opaque) {
+        this.opaque = opaque;
+    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
new file mode 100644
index 0000000..468a5cc
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -0,0 +1 @@
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    ROLL_VIEW_LOG_REQ,

    ROLL_VIEW_LOG_RES,

    VIEW_LOG_REQ,

    VIEW_LOG_RES,

    GET_LOG_REQ,

    GET_LOG
 _RES,

    EXECUTE_TASK_REQUEST,

    EXECUTE_TASK_RESPONSE,

    PING,

    PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
new file mode 100644
index 0000000..e75c2de
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -0,0 +1 @@
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.List;
import java.util.concu
 rrent.atomic.AtomicLong;

public class ExecuteTaskRequestCommand implements Serializable {

    private static final AtomicLong REQUEST = new AtomicLong(1);

    private String taskId;

    private String attemptId;

    private String applicationName;

    private String groupName;

    private String taskName;

    private int connectorPort;

    private String description;

    private String className;

    private String methodName;

    private String params;

    private List<Integer> shardItems;

    public List<Integer> getShardItems() {
        return shardItems;
    }

    public void setShardItems(List<Integer> shardItems) {
        this.shardItems = shardItems;
    }

    public String getParams() {
        return params;
    }

    public void setParams(String params) {
        this.params = params;
    }

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public String getAp
 plicationName() {
        return applicationName;
    }

    public void setApplicationName(String applicationName) {
        this.applicationName = applicationName;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public int getConnectorPort() {
        return connectorPort;
    }

    public void setConnectorPort(int connectorPort) {
        this.connectorPort = connectorPort;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }
     public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Command convert2Command(){
        Command command = new Command(REQUEST.getAndIncrement());
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
new file mode 100644
index 0000000..fafb575
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -0,0 +1 @@
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong
 ;

public class ExecuteTaskResponseCommand implements Serializable {

    private static final AtomicLong REQUEST = new AtomicLong(1);

    private String taskId;

    private String attemptId;

    private Object result;

    private long receivedTime;

    private int executeCount;

    private long executeTime;

    public String getAttemptId() {
        return attemptId;
    }

    public void setAttemptId(String attemptId) {
        this.attemptId = attemptId;
    }

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }

    public long getReceivedTime() {
        return receivedTime;
    }

    public void setReceivedTime(long receivedTime) {
        this.receivedTime = receivedTime;
    }

    public int getExecuteCount() {
        return executeCount
 ;
    }

    public void setExecuteCount(int executeCount) {
        this.executeCount = executeCount;
    }

    public long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(long executeTime) {
        this.executeTime = executeTime;
    }

    public Command convert2Command(){
        Command command = new Command(REQUEST.getAndIncrement());
        command.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
new file mode 100644
index 0000000..365d451
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Ping implements Serializable {
+
+    private static final AtomicLong ID = new AtomicLong(1);
+
+    protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+    private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+    private static final ByteBuf PING_BUF;
+
+    static {
+        ByteBuf ping = Unpooled.buffer();
+        ping.writeByte(Command.MAGIC);
+        ping.writeByte(CommandType.PING.ordinal());
+        ping.writeLong(0);
+        ping.writeInt(0);
+        ping.writeBytes(EMPTY_BODY);
+        PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+    }
+
+    public static ByteBuf pingContent(){
+        return PING_BUF.duplicate();
+    }
+
+    public static Command create(){
+        Command command = new Command(ID.getAndIncrement());
+        command.setType(CommandType.PING);
+        command.setBody(EMPTY_BODY_ARRAY);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
new file mode 100644
index 0000000..bc5abda
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+
+
+public class Pong implements Serializable {
+
+    protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+    private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+    private static final ByteBuf PONG_BUF;
+
+    static {
+        ByteBuf ping = Unpooled.buffer();
+        ping.writeByte(Command.MAGIC);
+        ping.writeByte(CommandType.PONG.ordinal());
+        ping.writeLong(0);
+        ping.writeInt(0);
+        ping.writeBytes(EMPTY_BODY);
+        PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+    }
+
+    public static ByteBuf pingContent(){
+        return PONG_BUF.duplicate();
+    }
+
+    public static Command create(long opaque){
+        Command command = new Command(opaque);
+        command.setType(CommandType.PONG);
+        command.setBody(EMPTY_BODY_ARRAY);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java
new file mode 100644
index 0000000..72c5fb8
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GetLogRequestCommand implements Serializable {
+
+    private static final AtomicLong REQUEST = new AtomicLong(1);
+
+    private String path;
+
+    public GetLogRequestCommand() {
+    }
+
+    public GetLogRequestCommand(String path) {
+        this.path = path;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public Command convert2Command(){
+        Command command = new Command(REQUEST.getAndIncrement());
+        command.setType(CommandType.VIEW_LOG_REQ);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java
new file mode 100644
index 0000000..39cb11f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+public class GetLogResponseCommand implements Serializable {
+
+    private byte[] data;
+
+    public GetLogResponseCommand() {
+    }
+
+    public GetLogResponseCommand(byte[] data) {
+        this.data = data;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+    public void setData(byte[] data) {
+        this.data = data;
+    }
+
+    public Command convert2Command(long opaque){
+        Command command = new Command(opaque);
+        command.setType(CommandType.GET_LOG_REQ);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
new file mode 100644
index 0000000..f655a8c
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class RollViewLogRequestCommand implements Serializable {
+
+    private static final AtomicLong REQUEST = new AtomicLong(1);
+
+    private String path;
+
+    private int skipLineNum;
+
+    private int limit;
+
+    public RollViewLogRequestCommand() {
+    }
+
+    public RollViewLogRequestCommand(String path, int skipLineNum, int limit) {
+        this.path = path;
+        this.skipLineNum = skipLineNum;
+        this.limit = limit;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public int getSkipLineNum() {
+        return skipLineNum;
+    }
+
+    public void setSkipLineNum(int skipLineNum) {
+        this.skipLineNum = skipLineNum;
+    }
+
+    public int getLimit() {
+        return limit;
+    }
+
+    public void setLimit(int limit) {
+        this.limit = limit;
+    }
+
+    public Command convert2Command(){
+        Command command = new Command(REQUEST.getAndIncrement());
+        command.setType(CommandType.ROLL_VIEW_LOG_REQ);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
new file mode 100644
index 0000000..95d476f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+public class RollViewLogResponseCommand implements Serializable {
+
+    private String msg;
+
+    public RollViewLogResponseCommand() {
+    }
+
+    public RollViewLogResponseCommand(String msg) {
+        this.msg = msg;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+
+    public Command convert2Command(long opaque){
+        Command command = new Command(opaque);
+        command.setType(CommandType.ROLL_VIEW_LOG_REQ);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
new file mode 100644
index 0000000..f9e01fe
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ViewLogRequestCommand implements Serializable {
+
+    private static final AtomicLong REQUEST = new AtomicLong(1);
+
+    private String path;
+
+    public ViewLogRequestCommand() {
+    }
+
+    public ViewLogRequestCommand(String path) {
+        this.path = path;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public Command convert2Command(){
+        Command command = new Command(REQUEST.getAndIncrement());
+        command.setType(CommandType.VIEW_LOG_REQ);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
new file mode 100644
index 0000000..d5a59c8
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+public class ViewLogResponseCommand implements Serializable {
+
+    private String msg;
+
+    public ViewLogResponseCommand() {
+    }
+
+    public ViewLogResponseCommand(String msg) {
+        this.msg = msg;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+
+    public Command convert2Command(long opaque){
+        Command command = new Command(opaque);
+        command.setType(CommandType.VIEW_LOG_RES);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
new file mode 100644
index 0000000..6b1ea5b
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+public class NettyClientConfig {
+
+    private int workerThreads = Constants.CPUS;
+
+    private boolean tcpNoDelay = true;
+
+    private boolean soKeepalive = true;
+
+    private int sendBufferSize = 65535;
+
+    private int receiveBufferSize = 65535;
+
+    public int getWorkerThreads() {
+        return workerThreads;
+    }
+
+    public void setWorkerThreads(int workerThreads) {
+        this.workerThreads = workerThreads;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public boolean isSoKeepalive() {
+        return soKeepalive;
+    }
+
+    public void setSoKeepalive(boolean soKeepalive) {
+        this.soKeepalive = soKeepalive;
+    }
+
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(int sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
new file mode 100644
index 0000000..9afaeb3
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+public class NettyServerConfig {
+
+    private int soBacklog = 1024;
+
+    private boolean tcpNoDelay = true;
+
+    private boolean soKeepalive = true;
+
+    private int sendBufferSize = 65535;
+
+    private int receiveBufferSize = 65535;
+
+    private int workerThread = Constants.CPUS;
+
+    private int listenPort = 12346;
+
+    public int getListenPort() {
+        return listenPort;
+    }
+
+    public void setListenPort(int listenPort) {
+        this.listenPort = listenPort;
+    }
+
+    public int getSoBacklog() {
+        return soBacklog;
+    }
+
+    public void setSoBacklog(int soBacklog) {
+        this.soBacklog = soBacklog;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public boolean isSoKeepalive() {
+        return soKeepalive;
+    }
+
+    public void setSoKeepalive(boolean soKeepalive) {
+        this.soKeepalive = soKeepalive;
+    }
+
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(int sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public int getWorkerThread() {
+        return workerThread;
+    }
+
+    public void setWorkerThread(int workerThread) {
+        this.workerThread = workerThread;
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
new file mode 100644
index 0000000..62ab907
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.exceptions;
+
+public class RemotingException extends Exception {
+
+    public RemotingException() {
+        super();
+    }
+
+    /** Constructs a new runtime exception with the specified detail message.
+     * The cause is not initialized, and may subsequently be initialized by a
+     * call to {@link #initCause}.
+     *
+     * @param   message   the detail message. The detail message is saved for
+     *          later retrieval by the {@link #getMessage()} method.
+     */
+    public RemotingException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new runtime exception with the specified detail message and
+     * cause.  <p>Note that the detail message associated with
+     * {@code cause} is <i>not</i> automatically incorporated in
+     * this runtime exception's detail message.
+     *
+     * @param  message the detail message (which is saved for later retrieval
+     *         by the {@link #getMessage()} method).
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     * @since  1.4
+     */
+    public RemotingException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /** Constructs a new runtime exception with the specified cause and a
+     * detail message of <tt>(cause==null ? null : cause.toString())</tt>
+     * (which typically contains the class and detail message of
+     * <tt>cause</tt>).  This constructor is useful for runtime exceptions
+     * that are little more than wrappers for other throwables.
+     *
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     * @since  1.4
+     */
+    public RemotingException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Constructs a new runtime exception with the specified detail
+     * message, cause, suppression enabled or disabled, and writable
+     * stack trace enabled or disabled.
+     *
+     * @param  message the detail message.
+     * @param cause the cause.  (A {@code null} value is permitted,
+     * and indicates that the cause is nonexistent or unknown.)
+     * @param enableSuppression whether or not suppression is enabled
+     *                          or disabled
+     * @param writableStackTrace whether or not the stack trace should
+     *                           be writable
+     *
+     * @since 1.7
+     */
+    protected RemotingException(String message, Throwable cause,
+                                boolean enableSuppression,
+                                boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
new file mode 100644
index 0000000..093614f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+
+@ChannelHandler.Sharable
+public class NettyClientHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
+
+    private final NettyRemotingClient nettyRemotingClient;
+
+    private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
+
+    public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
+        this.nettyRemotingClient = nettyRemotingClient;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+        ctx.channel().close();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        processReceived(ctx.channel(), (Command)msg);
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+         this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+        ExecutorService executorRef = executor;
+        if(executorRef == null){
+            executorRef = nettyRemotingClient.getDefaultExecutor();
+        }
+        this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+    }
+
+    private void processReceived(final Channel channel, final Command msg) {
+        final CommandType commandType = msg.getType();
+        final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
+        if (pair != null) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    try {
+                        pair.getLeft().process(channel, msg);
+                    } catch (Throwable ex) {
+                        logger.error("process msg {} error : {}", msg, ex);
+                    }
+                }
+            };
+            try {
+                pair.getRight().submit(r);
+            } catch (RejectedExecutionException e) {
+                logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+            }
+        } else {
+            logger.warn("commandType {} not support", commandType);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        logger.error("exceptionCaught : {}", cause);
+        nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+        ctx.channel().close();
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        Channel ch = ctx.channel();
+        ChannelConfig config = ch.config();
+
+        if (!ch.isWritable()) {
+            if (logger.isWarnEnabled()) {
+                logger.warn("{} is not writable, over high water level : {}",
+                        new Object[]{ch, config.getWriteBufferHighWaterMark()});
+            }
+
+            config.setAutoRead(false);
+        } else {
+            if (logger.isWarnEnabled()) {
+                logger.warn("{} is writable, to low water : {}",
+                        new Object[]{ch, config.getWriteBufferLowWaterMark()});
+            }
+            config.setAutoRead(true);
+        }
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
new file mode 100644
index 0000000..433f8b0
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+@ChannelHandler.Sharable
+public class NettyServerHandler extends ChannelInboundHandlerAdapter {
+
+    private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+
+    private final NettyRemotingServer nettyRemotingServer;
+
+    private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
+
+    public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
+        this.nettyRemotingServer = nettyRemotingServer;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        ctx.channel().close();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        processReceived(ctx.channel(), (Command)msg);
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+        this.registerProcessor(commandType, processor, null);
+    }
+
+    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+        ExecutorService executorRef = executor;
+        if(executorRef == null){
+            executorRef = nettyRemotingServer.getDefaultExecutor();
+        }
+        this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
+    }
+
+    private void processReceived(final Channel channel, final Command msg) {
+        final CommandType commandType = msg.getType();
+        final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
+        if (pair != null) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    try {
+                        pair.getLeft().process(channel, msg);
+                    } catch (Throwable ex) {
+                        logger.error("process msg {} error : {}", msg, ex);
+                    }
+                }
+            };
+            try {
+                pair.getRight().submit(r);
+            } catch (RejectedExecutionException e) {
+                logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+            }
+        } else {
+            logger.warn("commandType {} not support", commandType);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        logger.error("exceptionCaught : {}", cause);
+        ctx.channel().close();
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        Channel ch = ctx.channel();
+        ChannelConfig config = ch.config();
+
+        if (!ch.isWritable()) {
+            if (logger.isWarnEnabled()) {
+                logger.warn("{} is not writable, over high water level : {}",
+                        new Object[]{ch, config.getWriteBufferHighWaterMark()});
+            }
+
+            config.setAutoRead(false);
+        } else {
+            if (logger.isWarnEnabled()) {
+                logger.warn("{} is writable, to low water : {}",
+                        new Object[]{ch, config.getWriteBufferLowWaterMark()});
+            }
+            config.setAutoRead(true);
+        }
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
index 8376c28..7b19b9c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
@@ -14,23 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.processor;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
-
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
-
-  public static void main(String[] args) {
-    SpringApplication.run(ApiApplicationServer.class, args);
-  }
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
 
+public interface NettyRequestProcessor {
 
+    void process(final Channel channel, final Command command);
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
new file mode 100644
index 0000000..4d311be
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import java.io.Serializable;
+
+public class Address implements Serializable {
+
+    private String host;
+
+    private int port;
+
+    public Address(){
+        //NOP
+    }
+
+    public Address(String host, int port){
+        this.host = host;
+        this.port = port;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((host == null) ? 0 : host.hashCode());
+        result = prime * result + port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Address other = (Address) obj;
+        if (host == null) {
+            if (other.host != null)
+                return false;
+        } else if (!host.equals(other.host))
+            return false;
+        if (port != other.port)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "Address [host=" + host + ", port=" + port + "]";
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index 8376c28..aca2241 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -14,23 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.utils;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import io.netty.channel.Channel;
 
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+import java.net.InetSocketAddress;
 
-  public static void main(String[] args) {
-    SpringApplication.run(ApiApplicationServer.class, args);
-  }
+public class ChannelUtils {
 
+    public static String getLocalAddress(Channel channel){
+        return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress();
+    }
+
+    public static String getRemoteAddress(Channel channel){
+        return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress();
+    }
+
+    public static Address toAddress(Channel channel){
+        InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
+        return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
+    }
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 8376c28..c0a930c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -14,23 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.utils;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import java.nio.charset.Charset;
 
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+public class Constants {
 
-  public static void main(String[] args) {
-    SpringApplication.run(ApiApplicationServer.class, args);
-  }
+    public static final String COMMA = ",";
 
+    public static final String SLASH = "/";
+
+    public static final Charset UTF8 = Charset.forName("UTF-8");
+
+    public static final int CPUS = Runtime.getRuntime().availableProcessors();
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
index 8376c28..32569ed 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
@@ -14,23 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api;
+package org.apache.dolphinscheduler.remote.utils;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+import com.alibaba.fastjson.JSON;
 
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
+public class FastJsonSerializer {
 
-  public static void main(String[] args) {
-    SpringApplication.run(ApiApplicationServer.class, args);
-  }
+	public static <T> byte[] serialize(T obj)  {
+		String json = JSON.toJSONString(obj);
+		return json.getBytes(Constants.UTF8);
+	}
 
+	public static <T> String serializeToString(T obj)  {
+		return JSON.toJSONString(obj);
+	}
+
+	public static <T> T deserialize(byte[] src, Class<T> clazz) {
+		return JSON.parseObject(new String(src, Constants.UTF8), clazz);
+	}
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
similarity index 53%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
index 8376c28..a79a374 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
@@ -14,23 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api;
 
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.servlet.ServletComponentScan;
-import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
-import org.springframework.context.annotation.ComponentScan;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
+package org.apache.dolphinscheduler.remote.utils;
 
-@SpringBootApplication
-@ServletComponentScan
-@ComponentScan("org.apache.dolphinscheduler")
-public class ApiApplicationServer extends SpringBootServletInitializer {
 
-  public static void main(String[] args) {
-    SpringApplication.run(ApiApplicationServer.class, args);
-  }
+public class Pair<L, R> {
 
+    private L left;
 
+    private R right;
+
+    public Pair(L left, R right) {
+        this.left = left;
+        this.right = right;
+    }
+
+    public L getLeft() {
+        return left;
+    }
+
+    public void setLeft(L left) {
+        this.left = left;
+    }
+
+    public R getRight() {
+        return right;
+    }
+
+    public void setRight(R right) {
+        this.right = right;
+    }
 }
diff --git a/dolphinscheduler-remote/src/test/java/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/NettyRemotingClientTest.java
new file mode 100644
index 0000000..0c4601b
--- /dev/null
+++ b/dolphinscheduler-remote/src/test/java/NettyRemotingClientTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.Ping;
+import org.apache.dolphinscheduler.remote.command.Pong;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class NettyRemotingClientTest {
+
+
+    @Test
+    public void testSend(){
+        NettyServerConfig serverConfig = new NettyServerConfig();
+
+        NettyRemotingServer server = new NettyRemotingServer(serverConfig);
+        server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
+            @Override
+            public void process(Channel channel, Command command) {
+                channel.writeAndFlush(Pong.create(command.getOpaque()));
+            }
+        });
+        server.start();
+        //
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicLong opaque = new AtomicLong(1);
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        NettyRemotingClient client = new NettyRemotingClient(clientConfig);
+        client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
+            @Override
+            public void process(Channel channel, Command command) {
+                opaque.set(command.getOpaque());
+                latch.countDown();
+            }
+        });
+        Command commandPing = Ping.create();
+        try {
+            client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
+            latch.await();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        Assert.assertEquals(opaque.get(), commandPing.getOpaque());
+    }
+}
diff --git a/dolphinscheduler-rpc/pom.xml b/dolphinscheduler-rpc/pom.xml
deleted file mode 100644
index 680a4a2..0000000
--- a/dolphinscheduler-rpc/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.apache.dolphinscheduler</groupId>
-        <artifactId>dolphinscheduler</artifactId>
-        <version>1.2.1-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>dolphinscheduler-rpc</artifactId>
-
-    <name>dolphinscheduler-rpc</name>
-    <url>https://github.com/apache/incubator-dolphinscheduler</url>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <maven.compiler.source>1.8</maven.compiler.source>
-        <maven.compiler.target>1.8</maven.compiler.target>
-
-        <protobuf.version>3.5.1</protobuf.version>
-        <grpc.version>1.9.0</grpc.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-netty</artifactId>
-            <version>${grpc.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-protobuf</artifactId>
-            <version>${grpc.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.grpc</groupId>
-            <artifactId>grpc-stub</artifactId>
-            <version>${grpc.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <extensions>
-            <extension>
-                <groupId>kr.motd.maven</groupId>
-                <artifactId>os-maven-plugin</artifactId>
-                <version>1.5.0.Final</version>
-            </extension>
-        </extensions>
-        <plugins>
-            <plugin>
-                <groupId>org.xolstice.maven.plugins</groupId>
-                <artifactId>protobuf-maven-plugin</artifactId>
-                <version>0.5.0</version>
-                <configuration>
-                    <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
-                    <pluginId>grpc-java</pluginId>
-                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>compile</id>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>compile-custom</id>
-                        <goals>
-                            <goal>compile-custom</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>${java.version}</source>
-                    <target>${java.version}</target>
-                    <encoding>${project.build.sourceEncoding}</encoding>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git a/dolphinscheduler-rpc/src/main/proto/scheduler.proto b/dolphinscheduler-rpc/src/main/proto/scheduler.proto
deleted file mode 100644
index b8b595c..0000000
--- a/dolphinscheduler-rpc/src/main/proto/scheduler.proto
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-syntax = "proto3";
-
-package schduler;
-
-option java_multiple_files = true;
-option java_package = "org.apache.dolphinscheduler.rpc";
-option java_outer_classname = "SchdulerProto";
-
-
-/**
- *  return str info
- */
-message RetStrInfo {
-  /**
-   *  str msg info
-   */
-  string msg = 1 ;
-}
-
-/**
- * return byte info
- */
-message RetByteInfo {
-  /**
-   *  byte data info
-   */
-  bytes data  = 1;
-}
-
-/**
- *  log parameter
- */
-message LogParameter {
-
-  /**
-   * path
-   */
-  string path = 1 ;
-
-   /**
-    * skip line num
-   */
-  int32 skipLineNum = 2 ;
-
-  /**
-   * display limt num
-  */
-  int32 limit = 3 ;
-}
-
-
-/**
- *  path parameter
- */
-message PathParameter {
-
-  /**
-   * path
-   */
-  string path = 1 ;
-}
-
-/**
- *  log view service
- */
-service LogViewService {
-
-  /**
-   *  roll view log
-   */
-  rpc rollViewLog(LogParameter) returns (RetStrInfo) {};
-
-  /**
-     * view all log
-   */
-  rpc viewLog(PathParameter) returns (RetStrInfo) {};
-
-  /**
-    * get log bytes
-   */
-  rpc getLogBytes(PathParameter) returns (RetByteInfo) {};
-}
-
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 2ccc880..808f5d1 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -71,7 +71,7 @@
 
 		<dependency>
 			<groupId>org.apache.dolphinscheduler</groupId>
-			<artifactId>dolphinscheduler-rpc</artifactId>
+			<artifactId>dolphinscheduler-service</artifactId>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.curator</groupId>
@@ -110,8 +110,12 @@
 			<groupId>org.apache.dolphinscheduler</groupId>
 			<artifactId>dolphinscheduler-alert</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.dolphinscheduler</groupId>
+			<artifactId>dolphinscheduler-service</artifactId>
+		</dependency>
 
-    </dependencies>
+	</dependencies>
 
 
 	<build>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
new file mode 100644
index 0000000..c30875b
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+
+public class LoggerRequestProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);
+
+    private final ThreadPoolExecutor executor;
+
+    public LoggerRequestProcessor(){
+        this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
+    }
+
+    @Override
+    public void process(Channel channel, Command command) {
+        logger.info("received command : {}", command);
+        final CommandType commandType = command.getType();
+        switch (commandType){
+            case GET_LOG_REQ:
+                GetLogRequestCommand getLogRequest = FastJsonSerializer.deserialize(command.getBody(), GetLogRequestCommand.class);
+                byte[] bytes = getFileBytes(getLogRequest.getPath());
+                GetLogResponseCommand getLogResponse = new GetLogResponseCommand(bytes);
+                channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
+                break;
+            case VIEW_LOG_REQ:
+                ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(command.getBody(), ViewLogRequestCommand.class);
+                String msg = readFile(viewLogRequest.getPath());
+                ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
+                channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
+                break;
+            case ROLL_VIEW_LOG_REQ:
+                RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(command.getBody(), RollViewLogRequestCommand.class);
+                List<String> lines = readFile(rollViewLogRequest.getPath(), rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
+                StringBuilder builder = new StringBuilder();
+                for (String line : lines){
+                    builder.append(line + "\r\n");
+                }
+                RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
+                channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("unknown commandType : %s"));
+        }
+    }
+
+    public ExecutorService getExecutor(){
+        return this.executor;
+    }
+
+    /**
+     * get files bytes
+     *
+     * @param path path
+     * @return byte array of file
+     * @throws Exception exception
+     */
+    private byte[] getFileBytes(String path){
+        InputStream in = null;
+        ByteArrayOutputStream bos = null;
+        try {
+            in = new FileInputStream(path);
+            bos  = new ByteArrayOutputStream();
+            byte[] buf = new byte[1024];
+            int len = 0;
+            while ((len = in.read(buf)) != -1) {
+                bos.write(buf, 0, len);
+            }
+            return bos.toByteArray();
+        }catch (IOException e){
+            logger.error("get file bytes error",e);
+        }finally {
+            if (bos != null){
+                try {
+                    bos.close();
+                } catch (IOException ignore) {}
+            }
+            if (in != null){
+                try {
+                    in.close();
+                } catch (IOException ignore) {}
+            }
+        }
+        return new byte[0];
+    }
+
+    /**
+     * read file content
+     *
+     * @param path
+     * @param skipLine
+     * @param limit
+     * @return
+     */
+    private List<String> readFile(String path, int skipLine, int limit){
+        try (Stream<String> stream = Files.lines(Paths.get(path))) {
+            return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
+        } catch (IOException e) {
+            logger.error("read file failed",e);
+        }
+        return Collections.EMPTY_LIST;
+    }
+
+    /**
+     * read  file content
+     *
+     * @param path path
+     * @return string of file content
+     * @throws Exception exception
+     */
+    private String readFile(String path){
+        BufferedReader br = null;
+        String line = null;
+        StringBuilder sb = new StringBuilder();
+        try {
+            br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
+            while ((line = br.readLine()) != null){
+                sb.append(line + "\r\n");
+            }
+            return sb.toString();
+        }catch (IOException e){
+            logger.error("read file failed",e);
+        }finally {
+            try {
+                if (br != null){
+                    br.close();
+                }
+            } catch (IOException ignore) {}
+        }
+        return "";
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
new file mode 100644
index 0000000..83b9499
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.log;
+
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggerServer {
+
+    private static  final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
+
+    private final NettyRemotingServer server;
+
+    private final NettyServerConfig serverConfig;
+
+    private final LoggerRequestProcessor requestProcessor;
+
+    public LoggerServer(){
+        this.serverConfig = new NettyServerConfig();
+        this.serverConfig.setListenPort(Constants.RPC_PORT);
+        this.server = new NettyRemotingServer(serverConfig);
+        this.requestProcessor = new LoggerRequestProcessor();
+        this.server.registerProcessor(CommandType.GET_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
+        this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
+        this.server.registerProcessor(CommandType.VIEW_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
+    }
+
+    /**
+     * main launches the server from the command line.
+     * @param args arguments
+     */
+    public static void main(String[] args)  {
+        final LoggerServer server = new LoggerServer();
+        server.start();
+    }
+
+    /**
+     * server start
+     */
+    public void start()  {
+        this.server.start();
+        logger.info("logger server started, listening on port : {}" , Constants.RPC_PORT);
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                LoggerServer.this.stop();
+            }
+        });
+    }
+
+    /**
+     * stop
+     */
+    public void stop() {
+        this.server.close();
+        logger.info("logger server shut down");
+    }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
deleted file mode 100644
index 1c6c97b..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- *  log client
- */
-public class LogClient {
-
-    /**
-     * logger of LogClient
-     */
-    private static  final Logger logger = LoggerFactory.getLogger(LogClient.class);
-
-    /**
-     * managed channel
-     */
-    private final ManagedChannel channel;
-
-    /**
-     * blocking stub
-     */
-    private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
-
-    /**
-     * Construct client connecting to HelloWorld server at host:port.
-     *
-     * @param host host
-     * @param port port
-     */
-    public LogClient(String host, int port) {
-        this(ManagedChannelBuilder.forAddress(host, port)
-                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
-                // needing certificates.
-                .usePlaintext(true));
-    }
-
-    /**
-     * Construct client for accessing RouteGuide server using the existing channel.
-     *
-     * @param channelBuilder channel builder
-     */
-    LogClient(ManagedChannelBuilder<?> channelBuilder) {
-        /**
-         *  set max message read size
-         */
-        channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
-        channel = channelBuilder.build();
-        blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
-    }
-
-    /**
-     * shut down channel
-     *
-     * @throws InterruptedException interrupted exception
-     */
-    public void shutdown() throws InterruptedException {
-        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
-    }
-
-    /**
-     * roll view log
-     *
-     * @param path          log path
-     * @param skipLineNum   skip line num
-     * @param limit         limit
-     * @return log content
-     */
-    public String rollViewLog(String path,int skipLineNum,int limit) {
-        logger.info("roll view log , path : {},skipLineNum : {} ,limit :{}", path, skipLineNum, limit);
-        LogParameter pathParameter = LogParameter
-                .newBuilder()
-                .setPath(path)
-                .setSkipLineNum(skipLineNum)
-                .setLimit(limit)
-                .build();
-        RetStrInfo retStrInfo;
-        try {
-            retStrInfo = blockingStub.rollViewLog(pathParameter);
-            return retStrInfo.getMsg();
-        } catch (StatusRuntimeException e) {
-            logger.error("roll view log failed", e);
-            return null;
-        }
-    }
-
-    /**
-     * view all log
-     *
-     * @param path log path
-     * @return log content
-     */
-    public String viewLog(String path) {
-        logger.info("view log path : {}",path);
-
-        PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
-        RetStrInfo retStrInfo;
-        try {
-            retStrInfo = blockingStub.viewLog(pathParameter);
-            return retStrInfo.getMsg();
-        } catch (StatusRuntimeException e) {
-            logger.error("view log failed", e);
-            return null;
-        }
-    }
-
-    /**
-     * get log bytes
-     *
-     * @param path log path
-     * @return log content
-     */
-    public byte[] getLogBytes(String path) {
-        logger.info("get log bytes {}",path);
-
-        PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
-        RetByteInfo retByteInfo;
-        try {
-            retByteInfo = blockingStub.getLogBytes(pathParameter);
-            return retByteInfo.getData().toByteArray();
-        } catch (StatusRuntimeException e) {
-            logger.error("get log bytes failed ", e);
-            return null;
-        }
-    }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
deleted file mode 100644
index 5ec5df9..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.dolphinscheduler.common.Constants;
-import com.google.protobuf.ByteString;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * logger server
- */
-public class LoggerServer {
-
-    private static  final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
-
-    /**
-     * server
-     */
-    private Server server;
-
-    /**
-     * server start
-     * @throws IOException io exception
-     */
-    public void start() throws IOException {
-	    /* The port on which the server should run */
-        int port = Constants.RPC_PORT;
-        server = ServerBuilder.forPort(port)
-                .addService(new LogViewServiceGrpcImpl())
-                .build()
-                .start();
-        logger.info("server started, listening on port : {}" , port);
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
-                logger.info("shutting down gRPC server since JVM is shutting down");
-                LoggerServer.this.stop();
-                logger.info("server shut down");
-            }
-        });
-    }
-
-    /**
-     * stop
-     */
-    private void stop() {
-        if (server != null) {
-            server.shutdown();
-        }
-    }
-
-    /**
-     * await termination on the main thread since the grpc library uses daemon threads.
-     */
-    private void blockUntilShutdown() throws InterruptedException {
-        if (server != null) {
-            server.awaitTermination();
-        }
-    }
-
-    /**
-     * main launches the server from the command line.
-     */
-
-    /**
-     * main launches the server from the command line.
-     * @param args arguments
-     * @throws IOException          io exception
-     * @throws InterruptedException interrupted exception
-     */
-    public static void main(String[] args) throws IOException, InterruptedException {
-        final LoggerServer server = new LoggerServer();
-        server.start();
-        server.blockUntilShutdown();
-    }
-
-    /**
-     * Log View Service Grpc Implementation
-     */
-    static class LogViewServiceGrpcImpl extends LogViewServiceGrpc.LogViewServiceImplBase {
-        @Override
-        public void rollViewLog(LogParameter request, StreamObserver<RetStrInfo> responseObserver) {
-
-            logger.info("log parameter path : {} ,skip line : {}, limit : {}",
-                    request.getPath(),
-                    request.getSkipLineNum(),
-                    request.getLimit());
-            List<String> list = readFile(request.getPath(), request.getSkipLineNum(), request.getLimit());
-            StringBuilder sb = new StringBuilder();
-            boolean errorLineFlag = false;
-            for (String line : list){
-                sb.append(line + "\r\n");
-            }
-            RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build();
-            responseObserver.onNext(retInfoBuild);
-            responseObserver.onCompleted();
-        }
-
-        @Override
-        public void viewLog(PathParameter request, StreamObserver<RetStrInfo> responseObserver) {
-            logger.info("task path is : {} " , request.getPath());
-            RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(readFile(request.getPath())).build();
-            responseObserver.onNext(retInfoBuild);
-            responseObserver.onCompleted();
-        }
-
-        @Override
-        public void getLogBytes(PathParameter request, StreamObserver<RetByteInfo> responseObserver) {
-            try {
-                ByteString bytes = ByteString.copyFrom(getFileBytes(request.getPath()));
-                RetByteInfo.Builder builder = RetByteInfo.newBuilder();
-                builder.setData(bytes);
-                responseObserver.onNext(builder.build());
-                responseObserver.onCompleted();
-            }catch (Exception e){
-                logger.error("get log bytes failed",e);
-            }
-        }
-    }
-
-    /**
-     * get files bytes
-     *
-     * @param path path
-     * @return byte array of file
-     * @throws Exception exception
-     */
-    private static byte[] getFileBytes(String path){
-        InputStream in = null;
-        ByteArrayOutputStream bos = null;
-        try {
-            in = new FileInputStream(path);
-            bos  = new ByteArrayOutputStream();
-            byte[] buf = new byte[1024];
-            int len = 0;
-            while ((len = in.read(buf)) != -1) {
-                bos.write(buf, 0, len);
-            }
-            return bos.toByteArray();
-        }catch (IOException e){
-            logger.error("get file bytes error",e);
-        }finally {
-            if (bos != null){
-                try {
-                    bos.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-            if (in != null){
-                try {
-                    in.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-        return null;
-    }
-
-    /**
-     * read file content
-     *
-     * @param path
-     * @param skipLine
-     * @param limit
-     * @return
-     */
-    private static List<String> readFile(String path,int skipLine,int limit){
-        try (Stream<String> stream = Files.lines(Paths.get(path))) {
-            return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
-        } catch (IOException e) {
-            logger.error("read file failed",e);
-        }
-        return null;
-    }
-
-    /**
-     * read  file content
-     *
-     * @param path path
-     * @return string of file content
-     * @throws Exception exception
-     */
-    private static String readFile(String path){
-        BufferedReader br = null;
-        String line = null;
-        StringBuilder sb = new StringBuilder();
-        try {
-            br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
-            boolean errorLineFlag = false;
-            while ((line = br.readLine()) != null){
-                sb.append(line + "\r\n");
-            }
-
-            return sb.toString();
-        }catch (IOException e){
-            logger.error("read file failed",e);
-        }finally {
-            try {
-                if (br != null){
-                    br.close();
-                }
-            } catch (IOException e) {
-                logger.error(e.getMessage(),e);
-            }
-        }
-        return null;
-    }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 0b621a9..9afde60 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.rpc.LogClient;
 import org.apache.commons.io.FileUtils;
+import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -374,7 +374,7 @@ public class ProcessUtils {
   public static void killYarnJob(TaskInstance taskInstance) {
     try {
       Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-      LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT);
+      LogClientService logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
 
       String log = logClient.viewLog(taskInstance.getLogPath());
       if (StringUtils.isNotEmpty(log)) {
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
new file mode 100644
index 0000000..64d3481
--- /dev/null
+++ b/dolphinscheduler-service/pom.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>1.2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-service</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-remote</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
new file mode 100644
index 0000000..d6e1b9b
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * log client
+ */
+public class LogClientService implements NettyRequestProcessor {
+
+    private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
+
+    private final NettyClientConfig clientConfig;
+
+    private final NettyRemotingClient client;
+
+    private final Address address;
+
+    private final long logRequestTimeout = 10 * 1000; //10s
+
+    /**
+     * construct client
+     * @param host host
+     * @param port port
+     */
+    public LogClientService(String host, int port) {
+        this.address = new Address(host, port);
+        this.clientConfig = new NettyClientConfig();
+        this.clientConfig.setWorkerThreads(1);
+        this.client = new NettyRemotingClient(clientConfig);
+        this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RES,this);
+        this.client.registerProcessor(CommandType.VIEW_LOG_RES, this);
+        this.client.registerProcessor(CommandType.GET_LOG_RES, this);
+
+    }
+
+    /**
+     * shutdown
+     */
+    public void shutdown()  {
+        this.client.close();
+        logger.info("logger client shutdown");
+    }
+
+    /**
+     * roll view log
+     * @param path path
+     * @param skipLineNum skip line number
+     * @param limit limit
+     * @return log content
+     */
+    public String rollViewLog(String path,int skipLineNum,int limit) {
+        logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
+        RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
+        String result = "";
+        try {
+            Command command = request.convert2Command();
+            this.client.send(address, command);
+            LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+            result = ((String)promise.getResult());
+        } catch (Exception e) {
+            logger.error("roll view log error", e);
+        }
+        return result;
+    }
+
+    /**
+     * view log
+     * @param path path
+     * @return log content
+     */
+    public String viewLog(String path) {
+        logger.info("view log path {}", path);
+        ViewLogRequestCommand request = new ViewLogRequestCommand(path);
+        String result = "";
+        try {
+            Command command = request.convert2Command();
+            this.client.send(address, command);
+            LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+            result = ((String)promise.getResult());
+        } catch (Exception e) {
+            logger.error("view log error", e);
+        }
+        return result;
+    }
+
+    /**
+     * get log size
+     * @param path log path
+     * @return log content bytes
+     */
+    public byte[] getLogBytes(String path) {
+        logger.info("log path {}", path);
+        GetLogRequestCommand request = new GetLogRequestCommand(path);
+        byte[] result = null;
+        try {
+            Command command = request.convert2Command();
+            this.client.send(address, command);
+            LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+            result = (byte[])promise.getResult();
+        } catch (Exception e) {
+            logger.error("get log size error", e);
+        }
+        return result;
+    }
+
+    @Override
+    public void process(Channel channel, Command command) {
+        logger.info("received log response : {}", command);
+        switch (command.getType()){
+            case ROLL_VIEW_LOG_RES:
+                RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(command.getBody(), RollViewLogResponseCommand.class);
+                LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
+                break;
+            case VIEW_LOG_RES:
+                ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(command.getBody(), ViewLogResponseCommand.class);
+                LogPromise.notify(command.getOpaque(), viewLog.getMsg());
+                break;
+            case GET_LOG_RES:
+                GetLogResponseCommand getLog = FastJsonSerializer.deserialize(command.getBody(), GetLogResponseCommand.class);
+                LogPromise.notify(command.getOpaque(), getLog.getData());
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
+        }
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
new file mode 100644
index 0000000..ec9cac6
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.log;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+
+public class LogPromise {
+
+    private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
+
+    private long opaque;
+
+    private final long start;
+
+    private final long timeout;
+
+    private final CountDownLatch latch;
+
+    private Object result;
+
+    public LogPromise(long opaque, long timeout){
+        this.opaque = opaque;
+        this.timeout = timeout;
+        this.start = System.currentTimeMillis();
+        this.latch = new CountDownLatch(1);
+        PROMISES.put(opaque, this);
+    }
+
+
+    public static void notify(long opaque, Object result){
+        LogPromise promise = PROMISES.remove(opaque);
+        if(promise != null){
+            promise.doCountDown(result);
+        }
+    }
+
+    private void doCountDown(Object result){
+        this.result = result;
+        this.latch.countDown();
+    }
+
+    public boolean isTimeout(){
+        return System.currentTimeMillis() - start > timeout;
+    }
+
+    public Object getResult(){
+        try {
+            latch.await(timeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ignore) {
+        }
+        PROMISES.remove(opaque);
+        return this.result;
+    }
+
+
+}
diff --git a/pom.xml b/pom.xml
index 6451dce..5e10813 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,6 @@
 		<commons.configuration.version>1.10</commons.configuration.version>
 		<commons.email.version>1.5</commons.email.version>
 		<poi.version>3.17</poi.version>
-		<freemarker.version>2.3.21</freemarker.version>
 		<javax.servlet.api.version>3.1.0</javax.servlet.api.version>
 		<commons.collections4.version>4.1</commons.collections4.version>
 		<guava.version>20.0</guava.version>
@@ -120,6 +119,7 @@
 		<servlet-api.version>2.5</servlet-api.version>
 		<swagger.version>1.9.3</swagger.version>
 		<springfox.version>2.9.2</springfox.version>
+		<netty.version>4.1.42.Final</netty.version>
 	</properties>
 
 	<dependencyManagement>
@@ -230,7 +230,7 @@
 			</dependency>
 			<dependency>
 				<groupId>org.apache.dolphinscheduler</groupId>
-				<artifactId>dolphinscheduler-rpc</artifactId>
+				<artifactId>dolphinscheduler-remote</artifactId>
 				<version>${project.version}</version>
 			</dependency>
 			<dependency>
@@ -238,6 +238,11 @@
 				<artifactId>dolphinscheduler-alert</artifactId>
 				<version>${project.version}</version>
 			</dependency>
+			<dependency>
+				<groupId>org.apache.dolphinscheduler</groupId>
+				<artifactId>dolphinscheduler-service</artifactId>
+				<version>${project.version}</version>
+			</dependency>
 
 			<dependency>
 				<groupId>org.apache.curator</groupId>
@@ -358,11 +363,11 @@
 				<version>${slf4j.log4j12.version}</version>
 			</dependency>
 
-      <dependency>
-        <groupId>commons-collections</groupId>
-        <artifactId>commons-collections</artifactId>
-        <version>${commons.collections.version}</version>
-      </dependency>
+			<dependency>
+				<groupId>commons-collections</groupId>
+				<artifactId>commons-collections</artifactId>
+				<version>${commons.collections.version}</version>
+		  	</dependency>
 
 			<dependency>
 				<groupId>commons-httpclient</groupId>
@@ -406,13 +411,6 @@
 				<version>${poi.version}</version>
 			</dependency>
 
-			<dependency>
-				<groupId>org.freemarker</groupId>
-				<artifactId>freemarker</artifactId>
-				<version>${freemarker.version}</version>
-			</dependency>
-
-
 			<!-- hadoop -->
 			<dependency>
 				<groupId>org.apache.hadoop</groupId>
@@ -521,23 +519,29 @@
 				<artifactId>servlet-api</artifactId>
 				<version>${servlet-api.version}</version>
 			</dependency>
-      <dependency>
-        <groupId>io.springfox</groupId>
-        <artifactId>springfox-swagger2</artifactId>
-        <version>${springfox.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>io.springfox</groupId>
-        <artifactId>springfox-swagger-ui</artifactId>
-        <version>${springfox.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>com.github.xiaoymin</groupId>
-        <artifactId>swagger-bootstrap-ui</artifactId>
-        <version>${swagger.version}</version>
-      </dependency>
+
+			<dependency>
+				<groupId>io.springfox</groupId>
+				<artifactId>springfox-swagger2</artifactId>
+				<version>${springfox.version}</version>
+			</dependency>
+
+			<dependency>
+				<groupId>io.springfox</groupId>
+				<artifactId>springfox-swagger-ui</artifactId>
+				<version>${springfox.version}</version>
+			</dependency>
+
+			<dependency>
+				<groupId>com.github.xiaoymin</groupId>
+				<artifactId>swagger-bootstrap-ui</artifactId>
+				<version>${swagger.version}</version>
+			  </dependency>
+			<dependency>
+				<groupId>io.netty</groupId>
+				<artifactId>netty-all</artifactId>
+				<version>${netty.version}</version>
+			</dependency>
 		</dependencies>
 	</dependencyManagement>
 
@@ -768,7 +772,6 @@
 						<exclude>**/dolphinscheduler-ui/src/view/common/outro.inc</exclude>
 						<exclude>**/dolphinscheduler-ui/src/view/common/meta.inc</exclude>
 						<exclude>**/dolphinscheduler-ui/src/combo/1.0.0/3rd.css</exclude>
-						<exclude>**/dolphinscheduler-rpc/src/main/java/org/apache/dolphinscheduler/rpc/LogViewServiceGrpc.java</exclude>
 					</excludes>
 					<consoleOutput>true</consoleOutput>
 				</configuration>
@@ -860,8 +863,9 @@
 		<module>dolphinscheduler-api</module>
 		<module>dolphinscheduler-dao</module>
 		<module>dolphinscheduler-alert</module>
-		<module>dolphinscheduler-rpc</module>
 		<module>dolphinscheduler-dist</module>
-	</modules>
+		<module>dolphinscheduler-remote</module>
+        <module>dolphinscheduler-service</module>
+    </modules>
 
 </project>