You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/04/22 16:22:56 UTC
incubator-tinkerpop git commit: Removed all usage of voidPromise.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 9921a6ce5 -> 457e26ca4
Removed all usage of voidPromise.
Ever since voidPromise was introduced, tests fail somewhat randomly. Something must depend on a future on the channel somewhere.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/457e26ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/457e26ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/457e26ca
Branch: refs/heads/master
Commit: 457e26ca42e499607be90cc33a30a491f61e5eac
Parents: 9921a6c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Apr 22 10:21:57 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed Apr 22 10:21:57 2015 -0400
----------------------------------------------------------------------
.../server/handler/HttpGremlinEndpointHandler.java | 4 ++--
.../gremlin/server/handler/IteratorHandler.java | 6 +++---
.../server/handler/NioGremlinResponseEncoder.java | 8 ++++----
.../gremlin/server/handler/OpExecutorHandler.java | 2 +-
.../gremlin/server/handler/OpSelectorHandler.java | 2 +-
.../gremlin/server/op/AbstractEvalOpProcessor.java | 13 ++++++++++---
6 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 3d6368b..67e81c2 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -113,7 +113,7 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter {
final FullHttpRequest req = (FullHttpRequest) msg;
if (is100ContinueExpected(req)) {
- ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE), ctx.voidPromise());
+ ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
if (req.getMethod() != GET && req.getMethod() != POST) {
@@ -168,7 +168,7 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
- ctx.writeAndFlush(response, ctx.voidPromise());
+ ctx.writeAndFlush(response);
}
}
});
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
index b957bae..cdb7975 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
@@ -87,7 +87,7 @@ public class IteratorHandler extends ChannelOutboundHandlerAdapter {
if (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
ctx.writeAndFlush(ResponseMessage.build(requestMessage)
.code(ResponseStatusCode.SUCCESS)
- .result(aggregate).create(), ctx.voidPromise());
+ .result(aggregate).create());
aggregate = new ArrayList<>(resultIterationBatchSize);
}
@@ -107,10 +107,10 @@ public class IteratorHandler extends ChannelOutboundHandlerAdapter {
if (!f.isSuccess()) {
final String errorMessage = String.format("Response iteration and serialization exceeded the configured threshold for request [%s] - %s", msg, f.cause().getMessage());
logger.warn(errorMessage);
- ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create(), ctx.voidPromise());
+ ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
}
- ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SUCCESS_TERMINATOR).create(), ctx.voidPromise());
+ ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SUCCESS_TERMINATOR).create());
});
} finally {
ReferenceCountUtil.release(msg);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
index 0e6a9c0..987217e 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
@@ -82,14 +82,14 @@ public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMess
.statusMessage(errorMessage)
.code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
if (useBinary) {
- ctx.write(serializer.serializeResponseAsBinary(error, ctx.alloc()), ctx.voidPromise());
+ byteBuf.writeBytes(serializer.serializeResponseAsBinary(error, ctx.alloc()));
final ResponseMessage terminator = ResponseMessage.build(responseMessage.getRequestId()).code(ResponseStatusCode.SUCCESS_TERMINATOR).create();
- ctx.writeAndFlush(serializer.serializeResponseAsBinary(terminator, ctx.alloc()), ctx.voidPromise());
+ byteBuf.writeBytes(serializer.serializeResponseAsBinary(terminator, ctx.alloc()));
} else {
final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
- ctx.write(textSerializer.serializeResponseAsString(error), ctx.voidPromise());
+ byteBuf.writeBytes(textSerializer.serializeResponseAsString(error).getBytes(UTF8));
final ResponseMessage terminator = ResponseMessage.build(responseMessage.getRequestId()).code(ResponseStatusCode.SUCCESS_TERMINATOR).create();
- ctx.writeAndFlush(textSerializer.serializeResponseAsString(terminator), ctx.voidPromise());
+ byteBuf.writeBytes(textSerializer.serializeResponseAsString(terminator).getBytes(UTF8));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
index 2850aa2..9b206bd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
@@ -68,7 +68,7 @@ public class OpExecutorHandler extends SimpleChannelInboundHandler<Pair<RequestM
// Ops may choose to throw OpProcessorException or write the error ResponseMessage down the line
// themselves
logger.warn(ope.getMessage(), ope);
- ctx.writeAndFlush(ope.getResponseMessage(), ctx.voidPromise());
+ ctx.writeAndFlush(ope.getResponseMessage());
} finally {
ReferenceCountUtil.release(objects);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
index 7323623..98d56bb 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
@@ -86,7 +86,7 @@ public class OpSelectorHandler extends MessageToMessageDecoder<RequestMessage> {
} catch (OpProcessorException ope) {
errorMeter.mark();
logger.warn(ope.getMessage(), ope);
- ctx.writeAndFlush(ope.getResponseMessage(), ctx.voidPromise());
+ ctx.writeAndFlush(ope.getResponseMessage());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index 3ee6681..97d5ddd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@@ -157,7 +158,7 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
try {
handleIterator(context, itty);
- } catch (TimeoutException te) {
+ } catch (Exception te) {
throw new RuntimeException(te);
}
}, executor);
@@ -193,7 +194,7 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
* @param itty The result to iterator
* @throws TimeoutException if the time taken to serialize the entire result set exceeds the allowable time.
*/
- protected void handleIterator(final Context context, final Iterator itty) throws TimeoutException {
+ protected void handleIterator(final Context context, final Iterator itty) throws TimeoutException, InterruptedException {
final ChannelHandlerContext ctx = context.getChannelHandlerContext();
final RequestMessage msg = context.getRequestMessage();
final Settings settings = context.getSettings();
@@ -208,6 +209,8 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
.orElse(settings.resultIterationBatchSize);
List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
while (itty.hasNext()) {
+ if (Thread.interrupted()) throw new InterruptedException();
+
// have to check the aggregate size because it is possible that the channel is not writeable (below)
// so iterating next() if the message is not written and flushed would bump the aggregate size beyond
// the expected resultIterationBatchSize. Total serialization time for the response remains in
@@ -220,7 +223,7 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
if (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
ctx.writeAndFlush(ResponseMessage.build(msg)
.code(ResponseStatusCode.SUCCESS)
- .result(aggregate).create(), ctx.voidPromise());
+ .result(aggregate).create());
aggregate = new ArrayList<>(resultIterationBatchSize);
}
@@ -230,6 +233,10 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
warnOnce = true;
}
+
+ // since the client is lagging we can hold here for a period of time for the client to catch up.
+ // this isn't blocking the IO thread - just a worker.
+ TimeUnit.MILLISECONDS.sleep(10);
}
stopWatch.split();