You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/04/22 16:22:56 UTC

incubator-tinkerpop git commit: Removed all usage of voidPromise.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 9921a6ce5 -> 457e26ca4


Removed all usage of voidPromise.

Ever since voidPromise was introduced, tests fail somewhat randomly.  Something must depend on a future on the channel somewhere.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/457e26ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/457e26ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/457e26ca

Branch: refs/heads/master
Commit: 457e26ca42e499607be90cc33a30a491f61e5eac
Parents: 9921a6c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Apr 22 10:21:57 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed Apr 22 10:21:57 2015 -0400

----------------------------------------------------------------------
 .../server/handler/HttpGremlinEndpointHandler.java     |  4 ++--
 .../gremlin/server/handler/IteratorHandler.java        |  6 +++---
 .../server/handler/NioGremlinResponseEncoder.java      |  8 ++++----
 .../gremlin/server/handler/OpExecutorHandler.java      |  2 +-
 .../gremlin/server/handler/OpSelectorHandler.java      |  2 +-
 .../gremlin/server/op/AbstractEvalOpProcessor.java     | 13 ++++++++++---
 6 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 3d6368b..67e81c2 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -113,7 +113,7 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter {
             final FullHttpRequest req = (FullHttpRequest) msg;
 
             if (is100ContinueExpected(req)) {
-                ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE), ctx.voidPromise());
+                ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
             }
 
             if (req.getMethod() != GET && req.getMethod() != POST) {
@@ -168,7 +168,7 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter {
                             ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
                         } else {
                             response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-                            ctx.writeAndFlush(response, ctx.voidPromise());
+                            ctx.writeAndFlush(response);
                         }
                     }
                 });

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
index b957bae..cdb7975 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/IteratorHandler.java
@@ -87,7 +87,7 @@ public class IteratorHandler extends ChannelOutboundHandlerAdapter {
                         if (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
                             ctx.writeAndFlush(ResponseMessage.build(requestMessage)
                                     .code(ResponseStatusCode.SUCCESS)
-                                    .result(aggregate).create(), ctx.voidPromise());
+                                    .result(aggregate).create());
                             aggregate = new ArrayList<>(resultIterationBatchSize);
                         }
 
@@ -107,10 +107,10 @@ public class IteratorHandler extends ChannelOutboundHandlerAdapter {
                     if (!f.isSuccess()) {
                         final String errorMessage = String.format("Response iteration and serialization exceeded the configured threshold for request [%s] - %s", msg, f.cause().getMessage());
                         logger.warn(errorMessage);
-                        ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create(), ctx.voidPromise());
+                        ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusMessage(errorMessage).create());
                     }
 
-                    ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SUCCESS_TERMINATOR).create(), ctx.voidPromise());
+                    ctx.writeAndFlush(ResponseMessage.build(requestMessage).code(ResponseStatusCode.SUCCESS_TERMINATOR).create());
                 });
             } finally {
                 ReferenceCountUtil.release(msg);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
index 0e6a9c0..987217e 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
@@ -82,14 +82,14 @@ public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMess
                     .statusMessage(errorMessage)
                     .code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
             if (useBinary) {
-                ctx.write(serializer.serializeResponseAsBinary(error, ctx.alloc()), ctx.voidPromise());
+                byteBuf.writeBytes(serializer.serializeResponseAsBinary(error, ctx.alloc()));
                 final ResponseMessage terminator = ResponseMessage.build(responseMessage.getRequestId()).code(ResponseStatusCode.SUCCESS_TERMINATOR).create();
-                ctx.writeAndFlush(serializer.serializeResponseAsBinary(terminator, ctx.alloc()), ctx.voidPromise());
+                byteBuf.writeBytes(serializer.serializeResponseAsBinary(terminator, ctx.alloc()));
             } else {
                 final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
-                ctx.write(textSerializer.serializeResponseAsString(error), ctx.voidPromise());
+                byteBuf.writeBytes(textSerializer.serializeResponseAsString(error).getBytes(UTF8));
                 final ResponseMessage terminator = ResponseMessage.build(responseMessage.getRequestId()).code(ResponseStatusCode.SUCCESS_TERMINATOR).create();
-                ctx.writeAndFlush(textSerializer.serializeResponseAsString(terminator), ctx.voidPromise());
+                byteBuf.writeBytes(textSerializer.serializeResponseAsString(terminator).getBytes(UTF8));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
index 2850aa2..9b206bd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java
@@ -68,7 +68,7 @@ public class OpExecutorHandler extends SimpleChannelInboundHandler<Pair<RequestM
             // Ops may choose to throw OpProcessorException or write the error ResponseMessage down the line
             // themselves
             logger.warn(ope.getMessage(), ope);
-            ctx.writeAndFlush(ope.getResponseMessage(), ctx.voidPromise());
+            ctx.writeAndFlush(ope.getResponseMessage());
         } finally {
             ReferenceCountUtil.release(objects);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
index 7323623..98d56bb 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpSelectorHandler.java
@@ -86,7 +86,7 @@ public class OpSelectorHandler extends MessageToMessageDecoder<RequestMessage> {
         } catch (OpProcessorException ope) {
             errorMeter.mark();
             logger.warn(ope.getMessage(), ope);
-            ctx.writeAndFlush(ope.getResponseMessage(), ctx.voidPromise());
+            ctx.writeAndFlush(ope.getResponseMessage());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/457e26ca/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
index 3ee6681..97d5ddd 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractEvalOpProcessor.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
@@ -157,7 +158,7 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
 
             try {
                 handleIterator(context, itty);
-            } catch (TimeoutException te) {
+            } catch (Exception te) {
                 throw new RuntimeException(te);
             }
         }, executor);
@@ -193,7 +194,7 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
      * @param itty The result to iterator
      * @throws TimeoutException if the time taken to serialize the entire result set exceeds the allowable time.
      */
-    protected void handleIterator(final Context context, final Iterator itty) throws TimeoutException {
+    protected void handleIterator(final Context context, final Iterator itty) throws TimeoutException, InterruptedException {
         final ChannelHandlerContext ctx = context.getChannelHandlerContext();
         final RequestMessage msg = context.getRequestMessage();
         final Settings settings = context.getSettings();
@@ -208,6 +209,8 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
                 .orElse(settings.resultIterationBatchSize);
         List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
         while (itty.hasNext()) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
             // have to check the aggregate size because it is possible that the channel is not writeable (below)
             // so iterating next() if the message is not written and flushed would bump the aggregate size beyond
             // the expected resultIterationBatchSize.  Total serialization time for the response remains in
@@ -220,7 +223,7 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
                 if  (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
                     ctx.writeAndFlush(ResponseMessage.build(msg)
                             .code(ResponseStatusCode.SUCCESS)
-                            .result(aggregate).create(), ctx.voidPromise());
+                            .result(aggregate).create());
 
                     aggregate = new ArrayList<>(resultIterationBatchSize);
                 }
@@ -230,6 +233,10 @@ public abstract class AbstractEvalOpProcessor implements OpProcessor {
                     logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
                     warnOnce = true;
                 }
+
+                // since the client is lagging we can hold here for a period of time for the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
             }
 
             stopWatch.split();