You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/14 13:56:12 UTC

[incubator-dolphinscheduler] 01/03: refactor log client service

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-architecture
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git

commit 8e154ecbe46035d9338371d4a8d8bea9fccc81d3
Author: Technoboy- <te...@yeah.net>
AuthorDate: Thu Feb 13 22:06:34 2020 +0800

    refactor log client service
---
 .../api/service/LoggerService.java                 | 38 +++++++++-----------
 .../remote/NettyRemotingClient.java                | 17 ++++++++-
 .../remote/future/InvokeCallback.java              | 10 ++++++
 .../remote/future/ResponseFuture.java              | 29 ++++++++++++++++
 .../remote/handler/NettyClientHandler.java         |  4 +--
 .../server/utils/ProcessUtils.java                 |  4 +--
 .../service/log/LogClientService.java              | 40 ++++++++++++----------
 7 files changed, 97 insertions(+), 45 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index bff54b6..f20f657 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PreDestroy;
+
 /**
  * log service
  */
@@ -39,6 +41,17 @@ public class LoggerService {
   @Autowired
   private ProcessService processService;
 
+  private final LogClientService logClient;
+
+  public LoggerService(){
+    logClient = new LogClientService();
+  }
+
+  @PreDestroy
+  public void close(){
+    logClient.close();
+  }
+
   /**
    * view log
    *
@@ -64,17 +77,9 @@ public class LoggerService {
     Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
 
     logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
-    LogClientService logClient = null;
-    try {
-      logClient = new LogClientService(host, Constants.RPC_PORT);
-      String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
-      result.setData(log);
-      logger.info(log);
-    } finally {
-      if(logClient != null){
-        logClient.close();
-      }
-    }
+    String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit);
+    result.setData(log);
+    logger.info(log);
 
     return result;
   }
@@ -90,16 +95,7 @@ public class LoggerService {
     if (taskInstance == null){
       throw new RuntimeException("task instance is null");
     }
-
     String host = taskInstance.getHost();
-    LogClientService logClient = null;
-    try {
-      logClient = new LogClientService(host, Constants.RPC_PORT);
-      return logClient.getLogBytes(taskInstance.getLogPath());
-    } finally {
-      if(logClient != null){
-        logClient.close();
-      }
-    }
+    return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
   }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 678fe84..f8357a3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -132,6 +132,21 @@ public class NettyRemotingClient {
         }
     }
 
+    //TODO
+    public void sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException {
+        final Channel channel = getChannel(address);
+        if (channel == null) {
+            throw new RemotingException("network error");
+        }
+        final long opaque = command.getOpaque();
+        try {
+
+        } catch (Exception ex) {
+            String msg = String.format("send command %s to address %s encounter error", command, address);
+            throw new RemotingException(msg, ex);
+        }
+    }
+
     public Channel getChannel(Address address) {
         Channel channel = channels.get(address);
         if(channel != null && channel.isActive()){
@@ -188,7 +203,7 @@ public class NettyRemotingClient {
         this.channels.clear();
     }
 
-    public void removeChannel(Address address){
+    public void closeChannel(Address address){
         Channel channel = this.channels.remove(address);
         if(channel != null){
             channel.close();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
new file mode 100644
index 0000000..6ad6a7c
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
@@ -0,0 +1,10 @@
+package org.apache.dolphinscheduler.remote.future;
+
+/**
+ * @Author: Tboy
+ */
+public interface InvokeCallback {
+
+    void operationComplete(final ResponseFuture responseFuture);
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
new file mode 100644
index 0000000..036f990
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
@@ -0,0 +1,29 @@
+package org.apache.dolphinscheduler.remote.future;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @Author: Tboy
+ */
+public class ResponseFuture {
+
+    private final int opaque;
+
+    private final long timeoutMillis;
+
+    private final InvokeCallback invokeCallback;
+
+    private final long beginTimestamp = System.currentTimeMillis();
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback) {
+        this.opaque = opaque;
+        this.timeoutMillis = timeoutMillis;
+        this.invokeCallback = invokeCallback;
+    }
+
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index b063080..115f6e4 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -48,7 +48,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+        nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
         ctx.channel().close();
     }
 
@@ -96,7 +96,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         logger.error("exceptionCaught : {}", cause);
-        nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+        nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
         ctx.channel().close();
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 90711e1..e0c00c5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -378,8 +378,8 @@ public class ProcessUtils {
       LogClientService logClient = null;
       String log = null;
       try {
-        logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
-        log = logClient.viewLog(taskInstance.getLogPath());
+        logClient = new LogClientService();
+        log = logClient.viewLog(taskInstance.getHost(), Constants.RPC_PORT, taskInstance.getLogPath());
       } finally {
         if(logClient != null){
           logClient.close();
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index aa6999e..4faff33 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -40,8 +40,6 @@ public class LogClientService implements NettyRequestProcessor {
 
     private final NettyRemotingClient client;
 
-    private final Address address;
-
     /**
      *  request time out
      */
@@ -49,18 +47,14 @@ public class LogClientService implements NettyRequestProcessor {
 
     /**
      * construct client
-     * @param host host
-     * @param port port
      */
-    public LogClientService(String host, int port) {
-        this.address = new Address(host, port);
+    public LogClientService() {
         this.clientConfig = new NettyClientConfig();
-        this.clientConfig.setWorkerThreads(1);
+        this.clientConfig.setWorkerThreads(4);
         this.client = new NettyRemotingClient(clientConfig);
         this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
         this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
         this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
-
     }
 
     /**
@@ -73,15 +67,18 @@ public class LogClientService implements NettyRequestProcessor {
 
     /**
      * roll view log
+     * @param host host
+     * @param port port
      * @param path path
      * @param skipLineNum skip line number
      * @param limit limit
      * @return log content
      */
-    public String rollViewLog(String path,int skipLineNum,int limit) {
-        logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
+    public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) {
+        logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
         RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
         String result = "";
+        final Address address = new Address(host, port);
         try {
             Command command = request.convert2Command();
             this.client.send(address, command);
@@ -89,19 +86,24 @@ public class LogClientService implements NettyRequestProcessor {
             result = ((String)promise.getResult());
         } catch (Exception e) {
             logger.error("roll view log error", e);
+        } finally {
+            this.client.closeChannel(address);
         }
         return result;
     }
 
     /**
      * view log
+     * @param host host
+     * @param port port
      * @param path path
      * @return log content
      */
-    public String viewLog(String path) {
+    public String viewLog(String host, int port, String path) {
         logger.info("view log path {}", path);
         ViewLogRequestCommand request = new ViewLogRequestCommand(path);
         String result = "";
+        final Address address = new Address(host, port);
         try {
             Command command = request.convert2Command();
             this.client.send(address, command);
@@ -109,19 +111,24 @@ public class LogClientService implements NettyRequestProcessor {
             result = ((String)promise.getResult());
         } catch (Exception e) {
             logger.error("view log error", e);
+        } finally {
+            this.client.closeChannel(address);
         }
         return result;
     }
 
     /**
      * get log size
+     * @param host host
+     * @param port port
      * @param path log path
      * @return log content bytes
      */
-    public byte[] getLogBytes(String path) {
+    public byte[] getLogBytes(String host, int port, String path) {
         logger.info("log path {}", path);
         GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
         byte[] result = null;
+        final Address address = new Address(host, port);
         try {
             Command command = request.convert2Command();
             this.client.send(address, command);
@@ -129,6 +136,8 @@ public class LogClientService implements NettyRequestProcessor {
             result = (byte[])promise.getResult();
         } catch (Exception e) {
             logger.error("get log size error", e);
+        } finally {
+            this.client.closeChannel(address);
         }
         return result;
     }
@@ -156,11 +165,4 @@ public class LogClientService implements NettyRequestProcessor {
                 throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
         }
     }
-
-    public static void main(String[] args) throws Exception{
-        LogClientService logClient = new LogClientService("192.168.220.247", 50051);
-        byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
-        System.out.println(new String(logBytes));
-    }
-
 }
\ No newline at end of file