You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/03/02 02:17:32 UTC
[dubbo] branch 3.0 updated: Fix mem leak (#7301)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 3318ba2 Fix mem leak (#7301)
3318ba2 is described below
commit 3318ba26374d57468b421d9c5d45aad79b18f149
Author: GuoHao <gu...@gmail.com>
AuthorDate: Tue Mar 2 10:17:11 2021 +0800
Fix mem leak (#7301)
---
.../main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java | 6 ++++--
.../main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java | 3 ++-
.../org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java | 4 ++++
.../apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java | 3 ++-
.../dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java | 2 ++
.../org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java | 7 ++++++-
.../dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java | 2 ++
.../apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java | 3 ++-
.../main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java | 6 +++++-
9 files changed, 29 insertions(+), 7 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 69ced1c..0062abd 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -47,6 +47,8 @@ import io.netty.handler.codec.http2.Http2NoMoreStreamIdsException;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.util.AsciiString;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.io.InputStream;
@@ -162,8 +164,8 @@ public class ClientStream extends AbstractStream implements Stream {
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}
- streamChannel.write(new DefaultHttp2DataFrame(out, true));
-
+ final DefaultHttp2DataFrame data = new DefaultHttp2DataFrame(out, true);
+ streamChannel.write(data);
}
public void halfClose() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 3a18c4b..bd0bf8c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -190,7 +190,8 @@ public class ServerStream extends AbstractStream implements Stream {
convertAttachment(trailers, attachments);
}
ctx.write(new DefaultHttp2HeadersFrame(http2Headers));
- ctx.write(new DefaultHttp2DataFrame(buf));
+ final DefaultHttp2DataFrame data = new DefaultHttp2DataFrame(buf);
+ ctx.write(data);
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(trailers, true));
} catch (Throwable e) {
LOGGER.warn("Exception processing triple message", e);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index ff61409..3254e64 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
@@ -52,6 +53,9 @@ public class TripleClientHandler extends ChannelDuplexHandler {
} else if (msg instanceof Http2GoAwayFrame) {
final ConnectionHandler connectionHandler = ctx.pipeline().get(ConnectionHandler.class);
connectionHandler.onGoAway(ctx.channel());
+ ReferenceCountUtil.release(msg);
+ } else {
+ ReferenceCountUtil.release(msg);
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index d4ea1a7..2734014 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -25,8 +25,9 @@ public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final ClientStream invoker = TripleUtil.getClientStream(ctx);
+ final ByteBuf buffer = (ByteBuf) msg;
if (invoker != null) {
- invoker.onData(new ByteBufInputStream((ByteBuf) msg));
+ invoker.onData(new ByteBufInputStream(buffer, buffer.readableBytes(),true));
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 569a40b..b261b9a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -36,6 +36,7 @@ import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Frame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.util.ReferenceCountUtil;
import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responseErr;
import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responsePlainTextError;
@@ -52,6 +53,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
onDataRead(ctx, (Http2DataFrame) msg);
} else if (msg instanceof Http2Frame) {
// ignored
+ ReferenceCountUtil.release(msg);
} else {
super.channelRead(ctx, msg);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 8d8766d..4574ae0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -66,7 +66,12 @@ public class TripleHttp2Protocol extends Http2WireProtocol {
.gracefulShutdownTimeoutMillis(10000)
.frameLogger(CLIENT_LOGGER)
.build();
- final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInboundHandlerAdapter());
+ final Http2MultiplexHandler handler = new Http2MultiplexHandler(new SimpleChannelInboundHandler<Object>() {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ // empty
+ }
+ });
pipeline.addLast(codec, handler, new TripleClientHandler());
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
index 8c9dd04..503b46b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ChannelDuplexHandler;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2PingFrame;
+import io.netty.util.ReferenceCountUtil;
import static org.apache.dubbo.rpc.protocol.tri.GracefulShutdown.GRACEFUL_SHUTDOWN_PING;
@@ -44,6 +45,7 @@ public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
}
}
} else if (msg instanceof Http2GoAwayFrame) {
+ ReferenceCountUtil.release(msg);
} else {
super.channelRead(ctx, msg);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 03aac79..9a477c5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -25,8 +25,9 @@ public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final ServerStream serverStream = TripleUtil.getServerStream(ctx);
+ final ByteBuf buffer = (ByteBuf) msg;
if (serverStream != null) {
- serverStream.onData(new ByteBufInputStream((ByteBuf) msg));
+ serverStream.onData(new ByteBufInputStream(buffer,buffer.readableBytes(),true));
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 1bd816f..7222a4b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -159,7 +159,9 @@ public class TripleUtil {
String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
try {
final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
- return serialization.deserialize(url, serializeType, wrap.getType(), bais);
+ final Object ret = serialization.deserialize(url, serializeType, wrap.getType(), bais);
+ bais.close();
+ return ret;
} catch (Exception e) {
throw new RuntimeException("Failed to unwrap resp", e);
}
@@ -190,6 +192,7 @@ public class TripleUtil {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
multipleSerialization.serialize(url, serializeType, desc.getReturnClass().getName(), resp, bos);
builder.setData(ByteString.copyFrom(bos.toByteArray()));
+ bos.close();
return builder.build();
} catch (IOException e) {
throw new RuntimeException("Failed to pack wrapper req", e);
@@ -199,6 +202,7 @@ public class TripleUtil {
public static <T> T unpack(InputStream is, Class<T> clz) {
try {
final T req = (T) pbSerialization.deserialize(is, clz);
+ is.close();
return req;
} catch (IOException e) {
throw new RuntimeException("Failed to unpack req", e);