You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/03/02 02:17:32 UTC

[dubbo] branch 3.0 updated: Fix mem leak (#7301)

This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3318ba2  Fix mem leak (#7301)
3318ba2 is described below

commit 3318ba26374d57468b421d9c5d45aad79b18f149
Author: GuoHao <gu...@gmail.com>
AuthorDate: Tue Mar 2 10:17:11 2021 +0800

    Fix mem leak (#7301)
---
 .../main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java  | 6 ++++--
 .../main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java  | 3 ++-
 .../org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java     | 4 ++++
 .../apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java  | 3 ++-
 .../dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java      | 2 ++
 .../org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java     | 7 ++++++-
 .../dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java      | 2 ++
 .../apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java  | 3 ++-
 .../main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java    | 6 +++++-
 9 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 69ced1c..0062abd 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -47,6 +47,8 @@ import io.netty.handler.codec.http2.Http2NoMoreStreamIdsException;
 import io.netty.handler.codec.http2.Http2StreamChannel;
 import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
 import io.netty.util.AsciiString;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -162,8 +164,8 @@ public class ClientStream extends AbstractStream implements Stream {
         } finally {
             ClassLoadUtil.switchContextLoader(tccl);
         }
-        streamChannel.write(new DefaultHttp2DataFrame(out, true));
-
+        final DefaultHttp2DataFrame data = new DefaultHttp2DataFrame(out, true);
+        streamChannel.write(data);
     }
 
     public void halfClose() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 3a18c4b..bd0bf8c 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -190,7 +190,8 @@ public class ServerStream extends AbstractStream implements Stream {
                     convertAttachment(trailers, attachments);
                 }
                 ctx.write(new DefaultHttp2HeadersFrame(http2Headers));
-                ctx.write(new DefaultHttp2DataFrame(buf));
+                final DefaultHttp2DataFrame data = new DefaultHttp2DataFrame(buf);
+                ctx.write(data);
                 ctx.writeAndFlush(new DefaultHttp2HeadersFrame(trailers, true));
             } catch (Throwable e) {
                 LOGGER.warn("Exception processing triple message", e);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index ff61409..3254e64 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.Http2GoAwayFrame;
 import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 
@@ -52,6 +53,9 @@ public class TripleClientHandler extends ChannelDuplexHandler {
         } else if (msg instanceof Http2GoAwayFrame) {
             final ConnectionHandler connectionHandler = ctx.pipeline().get(ConnectionHandler.class);
             connectionHandler.onGoAway(ctx.channel());
+            ReferenceCountUtil.release(msg);
+        } else {
+            ReferenceCountUtil.release(msg);
         }
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
index d4ea1a7..2734014 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientInboundHandler.java
@@ -25,8 +25,9 @@ public class TripleClientInboundHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final ClientStream invoker = TripleUtil.getClientStream(ctx);
+        final ByteBuf buffer = (ByteBuf) msg;
         if (invoker != null) {
-            invoker.onData(new ByteBufInputStream((ByteBuf) msg));
+            invoker.onData(new ByteBufInputStream(buffer, buffer.readableBytes(),true));
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 569a40b..b261b9a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -36,6 +36,7 @@ import io.netty.handler.codec.http2.Http2DataFrame;
 import io.netty.handler.codec.http2.Http2Frame;
 import io.netty.handler.codec.http2.Http2Headers;
 import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.util.ReferenceCountUtil;
 
 import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responseErr;
 import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responsePlainTextError;
@@ -52,6 +53,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
             onDataRead(ctx, (Http2DataFrame) msg);
         } else if (msg instanceof Http2Frame) {
             // ignored
+            ReferenceCountUtil.release(msg);
         } else {
             super.channelRead(ctx, msg);
         }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 8d8766d..4574ae0 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -66,7 +66,12 @@ public class TripleHttp2Protocol extends Http2WireProtocol {
                 .gracefulShutdownTimeoutMillis(10000)
                 .frameLogger(CLIENT_LOGGER)
                 .build();
-        final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInboundHandlerAdapter());
+        final Http2MultiplexHandler handler = new Http2MultiplexHandler(new SimpleChannelInboundHandler<Object>() {
+            @Override
+            protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+                // empty
+            }
+        });
         pipeline.addLast(codec, handler, new TripleClientHandler());
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
index 8c9dd04..503b46b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.Http2ChannelDuplexHandler;
 import io.netty.handler.codec.http2.Http2GoAwayFrame;
 import io.netty.handler.codec.http2.Http2PingFrame;
+import io.netty.util.ReferenceCountUtil;
 
 import static org.apache.dubbo.rpc.protocol.tri.GracefulShutdown.GRACEFUL_SHUTDOWN_PING;
 
@@ -44,6 +45,7 @@ public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
                 }
             }
         } else if (msg instanceof Http2GoAwayFrame) {
+            ReferenceCountUtil.release(msg);
         } else {
             super.channelRead(ctx, msg);
         }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 03aac79..9a477c5 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -25,8 +25,9 @@ public class TripleServerInboundHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final ServerStream serverStream = TripleUtil.getServerStream(ctx);
+        final ByteBuf buffer = (ByteBuf) msg;
         if (serverStream != null) {
-            serverStream.onData(new ByteBufInputStream((ByteBuf) msg));
+            serverStream.onData(new ByteBufInputStream(buffer,buffer.readableBytes(),true));
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index 1bd816f..7222a4b 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -159,7 +159,9 @@ public class TripleUtil {
         String serializeType = convertHessianFromWrapper(wrap.getSerializeType());
         try {
             final ByteArrayInputStream bais = new ByteArrayInputStream(wrap.getData().toByteArray());
-            return serialization.deserialize(url, serializeType, wrap.getType(), bais);
+            final Object ret = serialization.deserialize(url, serializeType, wrap.getType(), bais);
+            bais.close();
+            return ret;
         } catch (Exception e) {
             throw new RuntimeException("Failed to unwrap resp", e);
         }
@@ -190,6 +192,7 @@ public class TripleUtil {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             multipleSerialization.serialize(url, serializeType, desc.getReturnClass().getName(), resp, bos);
             builder.setData(ByteString.copyFrom(bos.toByteArray()));
+            bos.close();
             return builder.build();
         } catch (IOException e) {
             throw new RuntimeException("Failed to pack wrapper req", e);
@@ -199,6 +202,7 @@ public class TripleUtil {
     public static <T> T unpack(InputStream is, Class<T> clz) {
         try {
             final T req = (T) pbSerialization.deserialize(is, clz);
+            is.close();
             return req;
         } catch (IOException e) {
             throw new RuntimeException("Failed to unpack req", e);