You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by di...@apache.org on 2020/09/28 18:47:56 UTC
[tinkerpop] branch 3.4-dev updated: Simplify Netty reference
counting (#1334)
This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch 3.4-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/3.4-dev by this push:
new 262b14d Simplify Netty reference counting (#1334)
262b14d is described below
commit 262b14de6a386ff769d9b8142382c3776be33cb3
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Mon Sep 28 11:47:38 2020 -0700
Simplify Netty reference counting (#1334)
---
.../apache/tinkerpop/gremlin/driver/Handler.java | 86 +++++++++++-----------
.../driver/handler/WebSocketClientHandler.java | 4 +-
.../handler/WebSocketGremlinResponseDecoder.java | 24 +++---
.../gremlin/driver/simple/AbstractClient.java | 7 +-
4 files changed, 53 insertions(+), 68 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 74cf761..32e0f67 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -121,6 +121,8 @@ final class Handler {
}
channelHandlerContext.writeAndFlush(messageBuilder.create());
} else {
+ // SimpleChannelInboundHandler will release the frame if we don't retain it explicitely.
+ ReferenceCountUtil.retain(response);
channelHandlerContext.fireChannelRead(response);
}
}
@@ -216,58 +218,52 @@ final class Handler {
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
- try {
- final ResponseStatusCode statusCode = response.getStatus().getCode();
- final ResultQueue queue = pending.get(response.getRequestId());
- if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
- final Object data = response.getResult().getData();
- final Map<String,Object> meta = response.getResult().getMeta();
+ final ResponseStatusCode statusCode = response.getStatus().getCode();
+ final ResultQueue queue = pending.get(response.getRequestId());
+ if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
+ final Object data = response.getResult().getData();
+ final Map<String,Object> meta = response.getResult().getMeta();
- if (!meta.containsKey(Tokens.ARGS_SIDE_EFFECT_KEY)) {
- // this is a "result" from the server which is either the result of a script or a
- // serialized traversal
- if (data instanceof List) {
- // unrolls the collection into individual results to be handled by the queue.
- final List<Object> listToUnroll = (List<Object>) data;
- listToUnroll.forEach(item -> queue.add(new Result(item)));
- } else {
- // since this is not a list it can just be added to the queue
- queue.add(new Result(response.getResult().getData()));
- }
+ if (!meta.containsKey(Tokens.ARGS_SIDE_EFFECT_KEY)) {
+ // this is a "result" from the server which is either the result of a script or a
+ // serialized traversal
+ if (data instanceof List) {
+ // unrolls the collection into individual results to be handled by the queue.
+ final List<Object> listToUnroll = (List<Object>) data;
+ listToUnroll.forEach(item -> queue.add(new Result(item)));
} else {
- // this is the side-effect from the server which is generated from a serialized traversal
- final String aggregateTo = meta.getOrDefault(Tokens.ARGS_AGGREGATE_TO, Tokens.VAL_AGGREGATE_TO_NONE).toString();
- if (data instanceof List) {
- // unrolls the collection into individual results to be handled by the queue.
- final List<Object> listOfSideEffects = (List<Object>) data;
- listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(aggregateTo, sideEffect));
- } else {
- // since this is not a list it can just be added to the queue. this likely shouldn't occur
- // however as the protocol will typically push everything to list first.
- queue.addSideEffect(aggregateTo, data);
- }
+ // since this is not a list it can just be added to the queue
+ queue.add(new Result(response.getResult().getData()));
}
} else {
- // this is a "success" but represents no results otherwise it is an error
- if (statusCode != ResponseStatusCode.NO_CONTENT) {
- final Map<String,Object> attributes = response.getStatus().getAttributes();
- final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
- (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
- final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
- (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
- queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
- exceptions, stackTrace, cleanStatusAttributes(attributes)));
+ // this is the side-effect from the server which is generated from a serialized traversal
+ final String aggregateTo = meta.getOrDefault(Tokens.ARGS_AGGREGATE_TO, Tokens.VAL_AGGREGATE_TO_NONE).toString();
+ if (data instanceof List) {
+ // unrolls the collection into individual results to be handled by the queue.
+ final List<Object> listOfSideEffects = (List<Object>) data;
+ listOfSideEffects.forEach(sideEffect -> queue.addSideEffect(aggregateTo, sideEffect));
+ } else {
+ // since this is not a list it can just be added to the queue. this likely shouldn't occur
+ // however as the protocol will typically push everything to list first.
+ queue.addSideEffect(aggregateTo, data);
}
}
-
- // as this is a non-PARTIAL_CONTENT code - the stream is done.
- if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
- pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes());
+ } else {
+ // this is a "success" but represents no results otherwise it is an error
+ if (statusCode != ResponseStatusCode.NO_CONTENT) {
+ final Map<String,Object> attributes = response.getStatus().getAttributes();
+ final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
+ (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
+ final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
+ (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
+ queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
+ exceptions, stackTrace, cleanStatusAttributes(attributes)));
}
- } finally {
- // in the event of an exception above the exception is tossed and handled by whatever channelpipeline
- // error handling is at play.
- ReferenceCountUtil.release(response);
+ }
+
+ // as this is a non-PARTIAL_CONTENT code - the stream is done.
+ if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
+ pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes());
}
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 4f43afe..b5c306e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -80,13 +80,13 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob
// a close frame doesn't mean much here. errors raised from closed channels will mark the host as dead
final WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
- ctx.fireChannelRead(frame.retain(2));
+ ctx.fireChannelRead(frame.retain());
} else if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame());
}else if (frame instanceof PongWebSocketFrame) {
logger.debug("Received response from keep-alive request");
} else if (frame instanceof BinaryWebSocketFrame) {
- ctx.fireChannelRead(frame.retain(2));
+ ctx.fireChannelRead(frame.retain());
} else if (frame instanceof CloseWebSocketFrame)
ch.close();
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
index 383e5a5..3b292f8 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
@@ -18,7 +18,6 @@
*/
package org.apache.tinkerpop.gremlin.driver.handler;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
import io.netty.channel.ChannelHandler;
@@ -27,7 +26,6 @@ import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.util.ReferenceCountUtil;
import java.util.List;
@@ -44,19 +42,15 @@ public final class WebSocketGremlinResponseDecoder extends MessageToMessageDecod
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final WebSocketFrame webSocketFrame, final List<Object> objects) throws Exception {
- try {
- if (webSocketFrame instanceof BinaryWebSocketFrame) {
- final BinaryWebSocketFrame tf = (BinaryWebSocketFrame) webSocketFrame;
- objects.add(serializer.deserializeResponse(tf.content()));
- } else if (webSocketFrame instanceof TextWebSocketFrame){
- final TextWebSocketFrame tf = (TextWebSocketFrame) webSocketFrame;
- final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
- objects.add(textSerializer.deserializeResponse(tf.text()));
- } else {
- throw new RuntimeException(String.format("WebSocket channel does not handle this type of message: %s", webSocketFrame.getClass().getName()));
- }
- } finally {
- ReferenceCountUtil.release(webSocketFrame);
+ if (webSocketFrame instanceof BinaryWebSocketFrame) {
+ final BinaryWebSocketFrame tf = (BinaryWebSocketFrame) webSocketFrame;
+ objects.add(serializer.deserializeResponse(tf.content()));
+ } else if (webSocketFrame instanceof TextWebSocketFrame) {
+ final TextWebSocketFrame tf = (TextWebSocketFrame) webSocketFrame;
+ final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
+ objects.add(textSerializer.deserializeResponse(tf.text()));
+ } else {
+ throw new RuntimeException(String.format("WebSocket channel does not handle this type of message: %s", webSocketFrame.getClass().getName()));
}
}
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index 4fb950c..e991e3e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.ReferenceCountUtil;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
@@ -84,11 +83,7 @@ public abstract class AbstractClient implements SimpleClient {
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
- try {
- callback.accept(response);
- } finally {
- ReferenceCountUtil.release(response);
- }
+ callback.accept(response);
}
}
}