You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/06 10:51:55 UTC
[incubator-dolphinscheduler] branch refactor-logger updated: 1,LoggerServer and LoggerClientService add comment 2,code optimization (#1901)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-logger
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-logger by this push:
new 57b830c 1,LoggerServer and LoggerClientService add comment 2,code optimization (#1901)
57b830c is described below
commit 57b830c22f6e7fd2308771a491eb800c40a08242
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Thu Feb 6 18:51:47 2020 +0800
1,LoggerServer and LoggerClientService add comment 2,code optimization (#1901)
* refactor-logger branch login error modify
* refactor-logger branch rollback LogClientService
* refactor-logger branch api server ResourceServiceTest fetch from dev
* refactor-logger branch view log CommandType modify
* 1,LoggerServer and LoggerClientService add comment
2,code optimization
* GetLogBytesRequestCommand request commandType modify
---
.../remote/NettyRemotingClient.java | 7 ++
.../remote/NettyRemotingServer.java | 5 +-
.../remote/codec/NettyDecoder.java | 3 +
.../remote/codec/NettyEncoder.java | 4 ++
.../dolphinscheduler/remote/command/Command.java | 25 +++++--
.../remote/command/CommandHeader.java | 3 +
.../remote/command/CommandType.java | 2 +-
...Command.java => GetLogBytesRequestCommand.java} | 15 +++--
...ommand.java => GetLogBytesResponseCommand.java} | 11 ++--
.../command/log/RollViewLogRequestCommand.java | 5 +-
.../command/log/RollViewLogResponseCommand.java | 5 +-
.../remote/command/log/ViewLogRequestCommand.java | 5 +-
.../remote/command/log/ViewLogResponseCommand.java | 5 +-
.../remote/config/NettyClientConfig.java | 3 +
.../remote/config/NettyServerConfig.java | 3 +
.../remote/exceptions/RemotingException.java | 3 +
.../remote/handler/NettyClientHandler.java | 5 +-
.../remote/handler/NettyServerHandler.java | 3 +
.../remote/processor/NettyRequestProcessor.java | 3 +
.../dolphinscheduler/remote/utils/Address.java | 22 ++++---
.../remote/utils/ChannelUtils.java | 3 +
.../remote/utils/FastJsonSerializer.java | 3 +
dolphinscheduler-server/pom.xml | 34 ----------
.../server/log/LoggerRequestProcessor.java | 77 ++++++++++++----------
.../dolphinscheduler/server/log/LoggerServer.java | 18 ++++-
.../service/log/LogClientService.java | 29 ++++----
.../dolphinscheduler/service/log/LogPromise.java | 9 ++-
27 files changed, 199 insertions(+), 111 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index dda78f3..678fe84 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -43,6 +43,9 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * remoting netty client
+ */
public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
@@ -68,6 +71,7 @@ public class NettyRemotingClient {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
+ @Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
@@ -85,6 +89,7 @@ public class NettyRemotingClient {
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyDecoder(),
@@ -111,6 +116,8 @@ public class NettyRemotingClient {
}
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
+
+ @Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
logger.info("sent command {} to {}", command, address);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
index 7fd7331..5823dbb 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
@@ -41,7 +41,9 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
+/**
+ * remoting netty server
+ */
public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
@@ -99,6 +101,7 @@ public class NettyRemotingServer {
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() {
+ @Override
protected void initChannel(NioSocketChannel ch) throws Exception {
initNettyChannel(ch);
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
index dc37334..998f4ee 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
@@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import java.util.List;
+/**
+ * netty decoder
+ */
public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
public NettyDecoder(){
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
index fb5b36a..dd4e523 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java
@@ -22,9 +22,13 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
+/**
+ * netty encoder
+ */
@Sharable
public class NettyEncoder extends MessageToByteEncoder<Command> {
+ @Override
protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
if(msg == null){
throw new Exception("encode msg is null");
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
index 3f0a394..4687db3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
@@ -18,6 +18,10 @@ package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
+/**
+ * receive task log request command and content fill
+ * for netty data serializable transfer
+ */
public class Command implements Serializable {
private static final long serialVersionUID = 1L;
@@ -31,8 +35,14 @@ public class Command implements Serializable {
this.opaque = opaque;
}
+ /**
+ * comman type
+ */
private CommandType type;
+ /**
+ * request unique identification
+ */
private long opaque;
private byte[] body;
@@ -61,6 +71,7 @@ public class Command implements Serializable {
this.body = body;
}
+ @Override
public int hashCode() {
final int prime = 31;
int result = 1;
@@ -68,17 +79,19 @@ public class Command implements Serializable {
return result;
}
+ @Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
Command other = (Command) obj;
- if (opaque != other.opaque)
- return false;
- return true;
+ return opaque == other.opaque;
}
@Override
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
index ac51d01..92f7ac3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
+/**
+ * command header
+ */
public class CommandHeader implements Serializable {
private byte type;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 468a5cc..54d2f8f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
ROLL_VIEW_LOG_REQ,
ROLL_VIEW_LOG_RES,
VIEW_LOG_REQ,
VIEW_LOG_RES,
GET_LOG_REQ,
GET_LOG
_RES,
EXECUTE_TASK_REQUEST,
EXECUTE_TASK_RESPONSE,
PING,
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
PING,
PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
similarity index 84%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
index 72c5fb8..1a2e6e4 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
@@ -24,16 +24,19 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
-public class GetLogRequestCommand implements Serializable {
+/**
+ * get log bytes request command
+ */
+public class GetLogBytesRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path;
- public GetLogRequestCommand() {
+ public GetLogBytesRequestCommand() {
}
- public GetLogRequestCommand(String path) {
+ public GetLogBytesRequestCommand(String path) {
this.path = path;
}
@@ -45,9 +48,13 @@ public class GetLogRequestCommand implements Serializable {
this.path = path;
}
+ /**
+ *
+ * @return
+ */
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
- command.setType(CommandType.VIEW_LOG_REQ);
+ command.setType(CommandType.GET_LOG_BYTES_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
similarity index 84%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
index b1f950f..05692fb 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
@@ -23,14 +23,17 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
-public class GetLogResponseCommand implements Serializable {
+/**
+ * get log bytes response command
+ */
+public class GetLogBytesResponseCommand implements Serializable {
private byte[] data;
- public GetLogResponseCommand() {
+ public GetLogBytesResponseCommand() {
}
- public GetLogResponseCommand(byte[] data) {
+ public GetLogBytesResponseCommand(byte[] data) {
this.data = data;
}
@@ -44,7 +47,7 @@ public class GetLogResponseCommand implements Serializable {
public Command convert2Command(long opaque){
Command command = new Command(opaque);
- command.setType(CommandType.GET_LOG_RES);
+ command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
index f655a8c..49d19aa 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -24,6 +24,9 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * roll view log request command
+ */
public class RollViewLogRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
@@ -69,7 +72,7 @@ public class RollViewLogRequestCommand implements Serializable {
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
- command.setType(CommandType.ROLL_VIEW_LOG_REQ);
+ command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
index f356e72..def3257 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
@@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
+/**
+ * roll view log response command
+ */
public class RollViewLogResponseCommand implements Serializable {
private String msg;
@@ -44,7 +47,7 @@ public class RollViewLogResponseCommand implements Serializable {
public Command convert2Command(long opaque){
Command command = new Command(opaque);
- command.setType(CommandType.ROLL_VIEW_LOG_RES);
+ command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
index f9e01fe..9ba9cd3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -24,6 +24,9 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
+/**
+ * view log request command
+ */
public class ViewLogRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
@@ -47,7 +50,7 @@ public class ViewLogRequestCommand implements Serializable {
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
- command.setType(CommandType.VIEW_LOG_REQ);
+ command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
index d5a59c8..6e3c799 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
@@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
+/**
+ * view log response command
+ */
public class ViewLogResponseCommand implements Serializable {
private String msg;
@@ -44,7 +47,7 @@ public class ViewLogResponseCommand implements Serializable {
public Command convert2Command(long opaque){
Command command = new Command(opaque);
- command.setType(CommandType.VIEW_LOG_RES);
+ command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
index 6b1ea5b..56d2643 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
@@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.remote.config;
import org.apache.dolphinscheduler.remote.utils.Constants;
+/**
+ * netty client config
+ */
public class NettyClientConfig {
private int workerThreads = Constants.CPUS;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
index 9afaeb3..847f316 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
@@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.remote.config;
import org.apache.dolphinscheduler.remote.utils.Constants;
+/**
+ * netty server config
+ */
public class NettyServerConfig {
private int soBacklog = 1024;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
index 62ab907..29d48db 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.remote.exceptions;
+/**
+ * remote exception
+ */
public class RemotingException extends Exception {
public RemotingException() {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
index 093614f..b063080 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -30,7 +30,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
-
+/**
+ * netty client request handler
+ */
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@@ -72,6 +74,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
if (pair != null) {
Runnable r = new Runnable() {
+ @Override
public void run() {
try {
pair.getLeft().process(channel, msg);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
index 433f8b0..8a7ee39 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -31,6 +31,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+/**
+ * netty server request handler
+ */
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
index 7b19b9c..10a8195 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
@@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.remote.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.command.Command;
+/**
+ * netty request processor
+ */
public interface NettyRequestProcessor {
void process(final Channel channel, final Command command);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
index 4d311be..221b895 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
@@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.remote.utils;
import java.io.Serializable;
+/**
+ * server address
+ */
public class Address implements Serializable {
private String host;
@@ -60,21 +63,24 @@ public class Address implements Serializable {
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
Address other = (Address) obj;
if (host == null) {
- if (other.host != null)
+ if (other.host != null) {
return false;
- } else if (!host.equals(other.host))
- return false;
- if (port != other.port)
+ }
+ } else if (!host.equals(other.host)) {
return false;
- return true;
+ }
+ return port == other.port;
}
@Override
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index aca2241..e9d93da 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -20,6 +20,9 @@ import io.netty.channel.Channel;
import java.net.InetSocketAddress;
+/**
+ * channel utils
+ */
public class ChannelUtils {
public static String getLocalAddress(Channel channel){
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
index 32569ed..a9b8546 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
@@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.remote.utils;
import com.alibaba.fastjson.JSON;
+/**
+ * json serialize or deserialize
+ */
public class FastJsonSerializer {
public static <T> byte[] serialize(T obj) {
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 808f5d1..e826b49 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -119,41 +119,7 @@
<build>
- <!--<resources>-->
- <!--<resource>-->
- <!--<directory>src/main/java</directory>-->
- <!--<includes>-->
- <!--<include>**/*.xml</include>-->
- <!--</includes>-->
- <!--<filtering>false</filtering>-->
- <!--</resource>-->
- <!--<resource>-->
- <!--<directory>src/main/resources</directory>-->
- <!--<includes>-->
- <!--<include>**/*.*</include>-->
- <!--</includes>-->
- <!--<filtering>false</filtering>-->
- <!--</resource>-->
- <!--</resources>-->
<plugins>
- <!--<plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptors>
- <descriptor>src/main/assembly/package.xml</descriptor>
- </descriptors>
- <appendAssemblyId>false</appendAssemblyId>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index c30875b..4e4404e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -37,7 +37,9 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-
+/**
+ * logger request process logic
+ */
public class LoggerRequestProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);
@@ -51,23 +53,31 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
logger.info("received command : {}", command);
+
+ /**
+ * reuqest task log command type
+ */
final CommandType commandType = command.getType();
switch (commandType){
- case GET_LOG_REQ:
- GetLogRequestCommand getLogRequest = FastJsonSerializer.deserialize(command.getBody(), GetLogRequestCommand.class);
- byte[] bytes = getFileBytes(getLogRequest.getPath());
- GetLogResponseCommand getLogResponse = new GetLogResponseCommand(bytes);
+ case GET_LOG_BYTES_REQUEST:
+ GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesRequestCommand.class);
+ byte[] bytes = getFileContentBytes(getLogRequest.getPath());
+ GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
- case VIEW_LOG_REQ:
- ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(command.getBody(), ViewLogRequestCommand.class);
- String msg = readFile(viewLogRequest.getPath());
+ case VIEW_WHOLE_LOG_REQUEST:
+ ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), ViewLogRequestCommand.class);
+ String msg = readWholeFileContent(viewLogRequest.getPath());
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break;
- case ROLL_VIEW_LOG_REQ:
- RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(command.getBody(), RollViewLogRequestCommand.class);
- List<String> lines = readFile(rollViewLogRequest.getPath(), rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
+ case ROLL_VIEW_LOG_REQUEST:
+ RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), RollViewLogRequestCommand.class);
+ List<String> lines = readPartFileContent(rollViewLogRequest.getPath(),
+ rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
StringBuilder builder = new StringBuilder();
for (String line : lines){
builder.append(line + "\r\n");
@@ -76,7 +86,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break;
default:
- throw new IllegalArgumentException(String.format("unknown commandType : %s"));
+ throw new IllegalArgumentException("unknown commandType");
}
}
@@ -85,20 +95,20 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
}
/**
- * get files bytes
+ * get files content bytes,for down load file
*
- * @param path path
+ * @param filePath file path
* @return byte array of file
* @throws Exception exception
*/
- private byte[] getFileBytes(String path){
+ private byte[] getFileContentBytes(String filePath){
InputStream in = null;
ByteArrayOutputStream bos = null;
try {
- in = new FileInputStream(path);
+ in = new FileInputStream(filePath);
bos = new ByteArrayOutputStream();
byte[] buf = new byte[1024];
- int len = 0;
+ int len;
while ((len = in.read(buf)) != -1) {
bos.write(buf, 0, len);
}
@@ -121,41 +131,42 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
}
/**
- * read file content
+ * read part file content,can skip any line and read some lines
*
- * @param path
- * @param skipLine
- * @param limit
- * @return
+ * @param filePath file path
+ * @param skipLine skip line
+ * @param limit read lines limit
+ * @return part file content
*/
- private List<String> readFile(String path, int skipLine, int limit){
- try (Stream<String> stream = Files.lines(Paths.get(path))) {
+ private List<String> readPartFileContent(String filePath,
+ int skipLine,
+ int limit){
+ try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
} catch (IOException e) {
- logger.error("read file failed",e);
+ logger.error("read file error",e);
}
return Collections.EMPTY_LIST;
}
/**
- * read file content
+ * read whole file content
*
- * @param path path
- * @return string of file content
- * @throws Exception exception
+ * @param filePath file path
+ * @return whole file content
*/
- private String readFile(String path){
+ private String readWholeFileContent(String filePath){
BufferedReader br = null;
- String line = null;
+ String line;
StringBuilder sb = new StringBuilder();
try {
- br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
while ((line = br.readLine()) != null){
sb.append(line + "\r\n");
}
return sb.toString();
}catch (IOException e){
- logger.error("read file failed",e);
+ logger.error("read file error",e);
}finally {
try {
if (br != null){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
index 83b9499..3520fb0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -25,14 +25,26 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * logger server
+ */
public class LoggerServer {
private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
+ /**
+ * netty server
+ */
private final NettyRemotingServer server;
+ /**
+ * netty server config
+ */
private final NettyServerConfig serverConfig;
+ /**
+ * loggger request processor
+ */
private final LoggerRequestProcessor requestProcessor;
public LoggerServer(){
@@ -40,9 +52,9 @@ public class LoggerServer {
this.serverConfig.setListenPort(Constants.RPC_PORT);
this.server = new NettyRemotingServer(serverConfig);
this.requestProcessor = new LoggerRequestProcessor();
- this.server.registerProcessor(CommandType.GET_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
- this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
- this.server.registerProcessor(CommandType.VIEW_LOG_REQ, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
}
/**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index d6e1b9b..575c514 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -42,7 +42,10 @@ public class LogClientService implements NettyRequestProcessor {
private final Address address;
- private final long logRequestTimeout = 10 * 1000; //10s
+ /**
+ * request time out
+ */
+ private final long logRequestTimeout = 10 * 1000;
/**
* construct client
@@ -54,9 +57,9 @@ public class LogClientService implements NettyRequestProcessor {
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(1);
this.client = new NettyRemotingClient(clientConfig);
- this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RES,this);
- this.client.registerProcessor(CommandType.VIEW_LOG_RES, this);
- this.client.registerProcessor(CommandType.GET_LOG_RES, this);
+ this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
+ this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
+ this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
}
@@ -117,7 +120,7 @@ public class LogClientService implements NettyRequestProcessor {
*/
public byte[] getLogBytes(String path) {
logger.info("log path {}", path);
- GetLogRequestCommand request = new GetLogRequestCommand(path);
+ GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null;
try {
Command command = request.convert2Command();
@@ -134,20 +137,24 @@ public class LogClientService implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
logger.info("received log response : {}", command);
switch (command.getType()){
- case ROLL_VIEW_LOG_RES:
- RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(command.getBody(), RollViewLogResponseCommand.class);
+ case ROLL_VIEW_LOG_RESPONSE:
+ RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
+ command.getBody(), RollViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
break;
- case VIEW_LOG_RES:
- ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(command.getBody(), ViewLogResponseCommand.class);
+ case VIEW_WHOLE_LOG_RESPONSE:
+ ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
+ command.getBody(), ViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), viewLog.getMsg());
break;
- case GET_LOG_RES:
- GetLogResponseCommand getLog = FastJsonSerializer.deserialize(command.getBody(), GetLogResponseCommand.class);
+ case GET_LOG_BYTES_RESPONSE:
+ GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesResponseCommand.class);
LogPromise.notify(command.getOpaque(), getLog.getData());
break;
default:
throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
}
}
+
}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
index ec9cac6..8920b8a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
@@ -21,7 +21,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
+/**
+ * log asyc callback
+ */
public class LogPromise {
private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
@@ -45,6 +47,11 @@ public class LogPromise {
}
+ /**
+ * notify client finish
+ * @param opaque unique identification
+ * @param result result
+ */
public static void notify(long opaque, Object result){
LogPromise promise = PROMISES.remove(opaque);
if(promise != null){