You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by di...@apache.org on 2020/09/28 18:47:56 UTC

[tinkerpop] branch 3.4-dev updated: Simplify Netty reference counting (#1334)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.4-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/3.4-dev by this push:
     new 262b14d  Simplify Netty reference counting (#1334)
262b14d is described below

commit 262b14de6a386ff769d9b8142382c3776be33cb3
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Mon Sep 28 11:47:38 2020 -0700

    Simplify Netty reference counting (#1334)
---
 .../apache/tinkerpop/gremlin/driver/Handler.java   | 86 +++++++++++-----------
 .../driver/handler/WebSocketClientHandler.java     |  4 +-
 .../handler/WebSocketGremlinResponseDecoder.java   | 24 +++---
 .../gremlin/driver/simple/AbstractClient.java      |  7 +-
 4 files changed, 53 insertions(+), 68 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 74cf761..32e0f67 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -121,6 +121,8 @@ final class Handler {
                 }
                 channelHandlerContext.writeAndFlush(messageBuilder.create());
             } else {
+                // SimpleChannelInboundHandler will release the frame if we don't retain it explicitely.
+                ReferenceCountUtil.retain(response);
                 channelHandlerContext.fireChannelRead(response);
             }
         }
@@ -216,58 +218,52 @@ final class Handler {
 
         @Override
         protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
-            try {
-                final ResponseStatusCode statusCode = response.getStatus().getCode();
-                final ResultQueue queue = pending.get(response.getRequestId());
-                if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
-                    final Object data = response.getResult().getData();
-                    final Map<String,Object> meta = response.getResult().getMeta();
+            final ResponseStatusCode statusCode = response.getStatus().getCode();
+            final ResultQueue queue = pending.get(response.getRequestId());
+            if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
+                final Object data = response.getResult().getData();
+                final Map<String,Object> meta = response.getResult().getMeta();
 
-                    if (!meta.containsKey(Tokens.ARGS_SIDE_EFFECT_KEY)) {
-                        // this is a "result" from the server which is either the result of a script or a
-                        // serialized traversal
-                        if (data instanceof List) {
-                            // unrolls the collection into individual results to be handled by the queue.
-                            final List<Object> listToUnroll = (List<Object>) data;
-                            listToUnroll.forEach(item -> queue.add(new Result(item)));
-                        } else {
-                            // since this is not a list it can just be added to the queue
-                            queue.add(new Result(response.getResult().getData()));
-                        }
+                if (!meta.containsKey(Tokens.ARGS_SIDE_EFFECT_KEY)) {
+                    // this is a "result" from the server which is either the result of a script or a
+                    // serialized traversal
+                    if (data instanceof List) {
+                        // unrolls the collection into individual results to be handled by the queue.
+                        final List<Object> listToUnroll = (List<Object>) data;
+                        listToUnroll.forEach(item -> queue.add(new Result(item)));
                     } else {
-                        // this is the side-effect from the server which is generated from a serialized traversal
-                        final String aggregateTo = meta.getOrDefault(Tokens.ARGS_AGGREGATE_TO, Tokens.VAL_AGGREGATE_TO_NONE).toString();
-                        if (data instanceof List) {
-                            // unrolls the collection into individual results to be handled by the queue.
-                            final List<Object> listOfSideEffects = (List<Object>) data;
-                            listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(aggregateTo, sideEffect));
-                        } else {
-                            // since this is not a list it can just be added to the queue. this likely shouldn't occur
-                            // however as the protocol will typically push everything to list first.
-                            queue.addSideEffect(aggregateTo, data);
-                        }
+                        // since this is not a list it can just be added to the queue
+                        queue.add(new Result(response.getResult().getData()));
                     }
                 } else {
-                    // this is a "success" but represents no results otherwise it is an error
-                    if (statusCode != ResponseStatusCode.NO_CONTENT) {
-                        final Map<String,Object> attributes = response.getStatus().getAttributes();
-                        final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
-                                (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
-                        final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
-                                (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
-                        queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
-                                exceptions, stackTrace, cleanStatusAttributes(attributes)));
+                    // this is the side-effect from the server which is generated from a serialized traversal
+                    final String aggregateTo = meta.getOrDefault(Tokens.ARGS_AGGREGATE_TO, Tokens.VAL_AGGREGATE_TO_NONE).toString();
+                    if (data instanceof List) {
+                        // unrolls the collection into individual results to be handled by the queue.
+                        final List<Object> listOfSideEffects = (List<Object>) data;
+                        listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(aggregateTo, sideEffect));
+                    } else {
+                        // since this is not a list it can just be added to the queue. this likely shouldn't occur
+                        // however as the protocol will typically push everything to list first.
+                        queue.addSideEffect(aggregateTo, data);
                     }
                 }
-
-                // as this is a non-PARTIAL_CONTENT code - the stream is done.
-                if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
-                    pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes());
+            } else {
+                // this is a "success" but represents no results otherwise it is an error
+                if (statusCode != ResponseStatusCode.NO_CONTENT) {
+                    final Map<String,Object> attributes = response.getStatus().getAttributes();
+                    final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
+                            (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
+                    final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
+                            (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
+                    queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
+                            exceptions, stackTrace, cleanStatusAttributes(attributes)));
                 }
-            } finally {
-                // in the event of an exception above the exception is tossed and handled by whatever channelpipeline
-                // error handling is at play.
-                ReferenceCountUtil.release(response);
+            }
+
+            // as this is a non-PARTIAL_CONTENT code - the stream is done.
+            if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
+                pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes());
             }
         }
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 4f43afe..b5c306e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -80,13 +80,13 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
         // a close frame doesn't mean much here.  errors raised from closed channels will mark the host as dead
         final WebSocketFrame frame = (WebSocketFrame) msg;
         if (frame instanceof TextWebSocketFrame) {
-            ctx.fireChannelRead(frame.retain(2));
+            ctx.fireChannelRead(frame.retain());
         } else if (frame instanceof PingWebSocketFrame) {
             ctx.writeAndFlush(new PongWebSocketFrame());
         }else if (frame instanceof PongWebSocketFrame) {
             logger.debug("Received response from keep-alive request");
         } else if (frame instanceof BinaryWebSocketFrame) {
-            ctx.fireChannelRead(frame.retain(2));
+            ctx.fireChannelRead(frame.retain());
         } else if (frame instanceof CloseWebSocketFrame)
             ch.close();
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
index 383e5a5..3b292f8 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver.handler;
 
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
 import io.netty.channel.ChannelHandler;
@@ -27,7 +26,6 @@ import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.util.ReferenceCountUtil;
 
 import java.util.List;
 
@@ -44,19 +42,15 @@ public final class WebSocketGremlinResponseDecoder extends MessageToMessageDecod
 
     @Override
     protected void decode(final ChannelHandlerContext channelHandlerContext, final WebSocketFrame webSocketFrame, final List<Object> objects) throws Exception {
-        try {
-            if (webSocketFrame instanceof BinaryWebSocketFrame) {
-                final BinaryWebSocketFrame tf = (BinaryWebSocketFrame) webSocketFrame;
-                objects.add(serializer.deserializeResponse(tf.content()));
-            } else if (webSocketFrame instanceof TextWebSocketFrame){
-                final TextWebSocketFrame tf = (TextWebSocketFrame) webSocketFrame;
-                final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
-                objects.add(textSerializer.deserializeResponse(tf.text()));
-            } else {
-                throw new RuntimeException(String.format("WebSocket channel does not handle this type of message: %s", webSocketFrame.getClass().getName()));
-            }
-        } finally {
-            ReferenceCountUtil.release(webSocketFrame);
+        if (webSocketFrame instanceof BinaryWebSocketFrame) {
+            final BinaryWebSocketFrame tf = (BinaryWebSocketFrame) webSocketFrame;
+            objects.add(serializer.deserializeResponse(tf.content()));
+        } else if (webSocketFrame instanceof TextWebSocketFrame) {
+            final TextWebSocketFrame tf = (TextWebSocketFrame) webSocketFrame;
+            final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
+            objects.add(textSerializer.deserializeResponse(tf.text()));
+        } else {
+            throw new RuntimeException(String.format("WebSocket channel does not handle this type of message: %s", webSocketFrame.getClass().getName()));
         }
     }
 }
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index 4fb950c..e991e3e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.ReferenceCountUtil;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
@@ -84,11 +83,7 @@ public abstract class AbstractClient implements SimpleClient {
 
         @Override
         protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
-            try {
-                callback.accept(response);
-            } finally {
-                ReferenceCountUtil.release(response);
-            }
+            callback.accept(response);
         }
     }
 }