You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/08/27 15:11:36 UTC

[1/2] incubator-tinkerpop git commit: Fixed bugs in NIO protocol for Gremlin Server.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 784d657af -> 7f8495cf5


Fixed bugs in NIO protocol for Gremlin Server.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1a68f6f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1a68f6f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1a68f6f9

Branch: refs/heads/master
Commit: 1a68f6f9fe2bae274a88bf637510465575f71d1f
Parents: 95ff755
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Aug 27 09:07:21 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 27 09:07:21 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../tinkerpop/gremlin/driver/Channelizer.java   | 10 +---
 .../tinkerpop/gremlin/driver/Cluster.java       | 17 ++++++
 .../tinkerpop/gremlin/driver/Connection.java    | 14 +++--
 .../tinkerpop/gremlin/driver/Settings.java      |  1 +
 .../handler/NioGremlinRequestEncoder.java       | 14 ++++-
 .../handler/NioGremlinResponseDecoder.java      | 25 ++++++--
 .../driver/util/ProfilingApplication.java       |  3 +
 .../gremlin/server/channel/NioChannelizer.java  |  4 +-
 .../handler/NioGremlinBinaryRequestDecoder.java | 62 +++++++++++---------
 .../handler/NioGremlinResponseEncoder.java      | 26 +++++---
 .../server/GremlinDriverIntegrateTest.java      | 28 +++++++--
 12 files changed, 144 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index be25cf3..5d99eb8 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Fixed bugs in the Gremlin Server's NIO protocol both on the server and driver side.
 * `GraphStep` can now take a single argument `Collection` which is either elements or element ids (i.e. `g.V([1,2,3])` is supported now).
 * Added `LoopsStep` to make the loop counter accessible within `repeat()`, `until()` and `emit()`.
 * Gephi Plugin no longer requires manual insert of `store` steps to visualize a traversal.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index ccf0f9b..1b77ea4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -35,7 +35,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
 import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,7 +128,7 @@ public interface Channelizer extends ChannelHandler {
     /**
      * WebSocket {@link Channelizer} implementation.
      */
-    final class WebSocketChannelizer extends AbstractChannelizer {
+    public final class WebSocketChannelizer extends AbstractChannelizer {
         private WebSocketClientHandler handler;
 
         private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
@@ -182,19 +181,16 @@ public interface Channelizer extends ChannelHandler {
     /**
      * NIO {@link Channelizer} implementation.
      */
-    final class NioChannelizer extends AbstractChannelizer {
-        private NioGremlinRequestEncoder nioGremlinRequestEncoder;
-
+    public final class NioChannelizer extends AbstractChannelizer {
         @Override
         public void init(final Connection connection) {
             super.init(connection);
-            nioGremlinRequestEncoder = new NioGremlinRequestEncoder(true, cluster.getSerializer());
         }
 
         @Override
         public void configure(ChannelPipeline pipeline) {
             pipeline.addLast("gremlin-decoder", new NioGremlinResponseDecoder(cluster.getSerializer()));
-            pipeline.addLast("gremlin-encoder", nioGremlinRequestEncoder);
+            pipeline.addLast("gremlin-encoder", new NioGremlinRequestEncoder(true, cluster.getSerializer()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index a732ad5..11de54e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -230,6 +230,7 @@ public final class Cluster {
         private int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
         private int reconnectInterval = Connection.RECONNECT_INTERVAL;
         private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
+        private String channelizer = Channelizer.WebSocketChannelizer.class.getName();
         private boolean enableSsl = false;
         private LoadBalancingStrategy loadBalancingStrategy = new LoadBalancingStrategy.RoundRobin();
         private AuthProperties authProps = new AuthProperties();
@@ -387,6 +388,21 @@ public final class Cluster {
         }
 
         /**
+         * Specify the {@link Channelizer} implementation to use on the client when creating a {@link Connection}.
+         */
+        public Builder channelizer(final String channelizerClass) {
+            this.channelizer = channelizerClass;
+            return this;
+        }
+
+        /**
+         * Specify the {@link Channelizer} implementation to use on the client when creating a {@link Connection}.
+         */
+        public Builder channelizer(final Class channelizerClass) {
+            return channelizer(channelizerClass.getCanonicalName());
+        }
+
+        /**
          * Time in milliseconds to wait before attempting to reconnect to a dead host after it has been marked dead.
          */
         public Builder reconnectIntialDelay(final int initialDelay) {
@@ -466,6 +482,7 @@ public final class Cluster {
             connectionPoolSettings.reconnectInterval = this.reconnectInterval;
             connectionPoolSettings.resultIterationBatchSize = this.resultIterationBatchSize;
             connectionPoolSettings.enableSsl = this.enableSsl;
+            connectionPoolSettings.channelizer = this.channelizer;
             return new Cluster(getContactPoints(), serializer, this.nioPoolSize, this.workerPoolSize,
                     connectionPoolSettings, loadBalancingStrategy, authProps);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 0f2f62a..3a76019 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -65,6 +65,8 @@ final class Connection {
      * busy a particular {@code Connection} is.
      */
     public final AtomicInteger borrowed = new AtomicInteger(0);
+    private final AtomicReference<Class<Channelizer>> channelizerClass = new AtomicReference<>(null);
+
     private volatile boolean isDead = false;
     private final int maxInProcess;
 
@@ -83,11 +85,15 @@ final class Connection {
         if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection while the cluster after close() is called");
 
         final Bootstrap b = this.cluster.getFactory().createBootstrap();
-        final Channelizer channelizer = new Channelizer.WebSocketChannelizer();
-        channelizer.init(this);
-        b.channel(NioSocketChannel.class).handler(channelizer);
-
         try {
+            if (channelizerClass.get() == null) {
+                channelizerClass.compareAndSet(null, (Class<Channelizer>) Class.forName(cluster.connectionPoolSettings().channelizer));
+            }
+
+            final Channelizer channelizer = channelizerClass.get().newInstance();
+            channelizer.init(this);
+            b.channel(NioSocketChannel.class).handler(channelizer);
+
             channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
             channelizer.connected();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 0e91979..07f3294 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -83,6 +83,7 @@ final class Settings {
         public int reconnectInterval = Connection.RECONNECT_INTERVAL;
         public int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
         public int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
+        public String channelizer = Channelizer.WebSocketChannelizer.class.getName();
         public String sessionId = null;
 
         public Optional<String> optionalSessionId() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
index f80e195..4a7a436 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-@ChannelHandler.Sharable
 public final class NioGremlinRequestEncoder extends MessageToByteEncoder<Object> {
     private static final Logger logger = LoggerFactory.getLogger(WebSocketGremlinRequestEncoder.class);
     private boolean binaryEncoding = false;
@@ -49,10 +48,19 @@ public final class NioGremlinRequestEncoder extends MessageToByteEncoder<Object>
         final RequestMessage requestMessage = (RequestMessage) msg;
         try {
             if (binaryEncoding) {
-                byteBuf.writeBytes(serializer.serializeRequestAsBinary(requestMessage, channelHandlerContext.alloc()));
+                // wrap the serialized message/payload inside of a "frame".  this works around the problem where
+                // the length of the payload is not encoded into the general protocol.  that length isn't needed
+                // for websockets because under that protocol, the message is wrapped in a "websocket frame". this
+                // is not the optimal way to deal with this really, but it does prevent a protocol change in this
+                // immediate moment trying to get the NioChannelizer working.
+                final ByteBuf bytes = serializer.serializeRequestAsBinary(requestMessage, channelHandlerContext.alloc());
+                byteBuf.writeInt(bytes.capacity());
+                byteBuf.writeBytes(bytes);
             } else {
                 final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
-                byteBuf.writeBytes(textSerializer.serializeRequestAsString(requestMessage).getBytes(CharsetUtil.UTF_8));
+                final byte [] bytes = textSerializer.serializeRequestAsString(requestMessage).getBytes(CharsetUtil.UTF_8);
+                byteBuf.writeInt(bytes.length);
+                byteBuf.writeBytes(bytes);
             }
         } catch (Exception ex) {
             logger.warn(String.format("An error occurred during serialization of this request [%s] - it could not be sent to the server.", requestMessage), ex);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
index 6f4a44d..d61b809 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
@@ -18,26 +18,43 @@
  */
 package org.apache.tinkerpop.gremlin.driver.handler;
 
+import io.netty.handler.codec.ReplayingDecoder;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
 
 import java.util.List;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public final class NioGremlinResponseDecoder extends ByteToMessageDecoder {
+public final class NioGremlinResponseDecoder extends ReplayingDecoder<NioGremlinResponseDecoder.DecoderState> {
     private final MessageSerializer serializer;
+    private int messageLength;
 
     public NioGremlinResponseDecoder(final MessageSerializer serializer) {
+        super(DecoderState.MESSAGE_LENGTH);
         this.serializer = serializer;
     }
 
     @Override
     protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List<Object> objects) throws Exception {
-        if (byteBuf.readableBytes() < 1) return;
-        objects.add(serializer.deserializeResponse(byteBuf));
+        switch (state()) {
+            case MESSAGE_LENGTH:
+                messageLength = byteBuf.readInt();
+                checkpoint(DecoderState.MESSAGE);
+            case MESSAGE:
+                final ByteBuf messageFrame = byteBuf.readBytes(messageLength);
+                objects.add(serializer.deserializeResponse(messageFrame));
+                checkpoint(DecoderState.MESSAGE_LENGTH);
+                break;
+            default:
+                throw new Error("Invalid message received from Gremlin Server");
+        }
+    }
+
+    public enum DecoderState {
+        MESSAGE_LENGTH,
+        MESSAGE
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index 000ac29..5e640c9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver.util;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
@@ -129,6 +130,7 @@ public class ProfilingApplication {
         final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", "3000").toString());
         final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", "2").toString());
         final int tooSlowThreshold = Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString());
+        final String channelizer = options.getOrDefault("channelizer", Channelizer.WebSocketChannelizer.class.getName()).toString();
         final String serializer = options.getOrDefault("serializer", Serializers.GRYO_V1D0.name()).toString();
 
         final String script = options.getOrDefault("script", "1+1").toString();
@@ -141,6 +143,7 @@ public class ProfilingApplication {
                 .minInProcessPerConnection(minInProcessPerConnection)
                 .maxInProcessPerConnection(maxInProcessPerConnection)
                 .nioPoolSize(nioPoolSize)
+                .channelizer(channelizer)
                 .maxWaitForConnection(maxWaitForConnection)
                 .serializer(Serializers.valueOf(serializer))
                 .workerPoolSize(workerPoolSize).create();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
index fba75f9..ec0eeaf 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
@@ -40,13 +40,11 @@ import org.slf4j.LoggerFactory;
 public class NioChannelizer extends AbstractChannelizer {
     private static final Logger logger = LoggerFactory.getLogger(NioChannelizer.class);
 
-    private NioGremlinBinaryRequestDecoder nioGremlinBinaryRequestDecoder;
     private SaslAuthenticationHandler authenticationHandler;
 
     @Override
     public void init(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor) {
         super.init(serverGremlinExecutor);
-        nioGremlinBinaryRequestDecoder = new NioGremlinBinaryRequestDecoder(serializers);
 
         // configure authentication - null means don't bother to add authentication to the pipeline
         if (authenticator != null)
@@ -60,7 +58,7 @@ public class NioChannelizer extends AbstractChannelizer {
             pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG));
 
         pipeline.addLast("response-encoder", new NioGremlinResponseEncoder());
-        pipeline.addLast("request-binary-decoder", nioGremlinBinaryRequestDecoder);
+        pipeline.addLast("request-binary-decoder", new NioGremlinBinaryRequestDecoder(serializers));
 
         if (logger.isDebugEnabled())
             pipeline.addLast(new LoggingHandler("log-codec", LogLevel.DEBUG));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
index 72b88a7..c6996a4 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
@@ -18,60 +18,63 @@
  */
 package org.apache.tinkerpop.gremlin.server.handler;
 
+import io.netty.handler.codec.ReplayingDecoder;
+import io.netty.util.CharsetUtil;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public class NioGremlinBinaryRequestDecoder extends ByteToMessageDecoder {
-    private static final Logger logger = LoggerFactory.getLogger(WsGremlinBinaryRequestDecoder.class);
+public class NioGremlinBinaryRequestDecoder extends ReplayingDecoder<NioGremlinBinaryRequestDecoder.DecoderState> {
+    private static final Logger logger = LoggerFactory.getLogger(NioGremlinBinaryRequestDecoder.class);
 
-    private static final Charset UTF8 = Charset.forName("UTF-8");
     private final Map<String, MessageSerializer> serializers;
+    private int messageLength;
 
     public NioGremlinBinaryRequestDecoder(final Map<String, MessageSerializer> serializers) {
+        super(DecoderState.MESSAGE_LENGTH);
         this.serializers = serializers;
     }
 
     @Override
     protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List<Object> objects) throws Exception {
-        if (byteBuf.readableBytes() < 1) {
-            return;
-        }
+        switch (state()) {
+            case MESSAGE_LENGTH:
+                messageLength = byteBuf.readInt();
+                checkpoint(DecoderState.MESSAGE);
+            case MESSAGE:
+                try {
+                    final ByteBuf messageFrame = byteBuf.readBytes(messageLength);
+                    final int contentTypeLength = messageFrame.readByte();
+                    final ByteBuf contentTypeFrame = messageFrame.readBytes(contentTypeLength);
+                    final String contentType = contentTypeFrame.toString(CharsetUtil.UTF_8);
 
-        final byte lenOfContentType = byteBuf.readByte();
-        if (byteBuf.readableBytes() < lenOfContentType) {
-            byteBuf.resetReaderIndex();
-            return;
-        }
+                    final MessageSerializer serializer = select(contentType, Serializers.DEFAULT_REQUEST_SERIALIZER);
+                    channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
+                    channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);
 
-        final ByteBuf contentTypeBytes = channelHandlerContext.alloc().buffer(lenOfContentType);
-        try {
-            byteBuf.readBytes(contentTypeBytes);
-            final String contentType = contentTypeBytes.toString(UTF8);
-            final MessageSerializer serializer = select(contentType, Serializers.DEFAULT_REQUEST_SERIALIZER);
+                    // subtract the contentTypeLength and the byte that held it from the full message length to
+                    // figure out how long the rest of the message is
+                    final int payloadLength = messageLength - 1 - contentTypeLength;
+                    objects.add(serializer.deserializeRequest(messageFrame.readBytes(payloadLength)));
+                } catch (SerializationException se) {
+                    objects.add(RequestMessage.INVALID);
+                }
 
-            channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
-            channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);
-            try {
-                objects.add(serializer.deserializeRequest(byteBuf.discardReadBytes()));
-            } catch (SerializationException se) {
-                objects.add(RequestMessage.INVALID);
-            }
-        } finally {
-            contentTypeBytes.release();
+                checkpoint(DecoderState.MESSAGE_LENGTH);
+                break;
+            default:
+                throw new Error("Invalid message sent to Gremlin Server");
         }
     }
 
@@ -82,4 +85,9 @@ public class NioGremlinBinaryRequestDecoder extends ByteToMessageDecoder {
 
         return serializers.getOrDefault(mimeType, defaultSerializer);
     }
+
+    public enum DecoderState {
+        MESSAGE_LENGTH,
+        MESSAGE
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
index 99a4167..e62703f 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.server.handler;
 
 import com.codahale.metrics.Meter;
+import io.netty.util.CharsetUtil;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
@@ -32,8 +33,6 @@ import io.netty.handler.codec.MessageToByteEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.Charset;
-
 import static com.codahale.metrics.MetricRegistry.name;
 
 /**
@@ -43,7 +42,6 @@ import static com.codahale.metrics.MetricRegistry.name;
 public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMessage> {
     private static final Logger logger = LoggerFactory.getLogger(NioGremlinResponseEncoder.class);
     static final Meter errorMeter = MetricManager.INSTANCE.getMeter(name(GremlinServer.class, "errors"));
-    private static final Charset UTF8 = Charset.forName("UTF-8");
 
     @Override
     protected void encode(final ChannelHandlerContext ctx, final ResponseMessage responseMessage, final ByteBuf byteBuf) throws Exception {
@@ -54,13 +52,18 @@ public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMess
             if (!responseMessage.getStatus().getCode().isSuccess())
                 errorMeter.mark();
 
-            if (useBinary)
-                byteBuf.writeBytes(serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()));
-            else {
+            if (useBinary) {
+                final ByteBuf bytes = serializer.serializeResponseAsBinary(responseMessage, ctx.alloc());
+                byteBuf.writeInt(bytes.capacity());
+                byteBuf.writeBytes(bytes);
+                bytes.release();
+            } else {
                 // the expectation is that the GremlinTextRequestDecoder will have placed a MessageTextSerializer
                 // instance on the channel.
                 final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
-                byteBuf.writeBytes(textSerializer.serializeResponseAsString(responseMessage).getBytes(UTF8));
+                final byte [] bytes = textSerializer.serializeResponseAsString(responseMessage).getBytes(CharsetUtil.UTF_8);
+                byteBuf.writeInt(bytes.length);
+                byteBuf.writeBytes(bytes);
             }
         } catch (Exception ex) {
             errorMeter.mark();
@@ -71,10 +74,15 @@ public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMess
                     .statusMessage(errorMessage)
                     .code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
             if (useBinary) {
-                byteBuf.writeBytes(serializer.serializeResponseAsBinary(error, ctx.alloc()));
+                final ByteBuf bytes = serializer.serializeResponseAsBinary(error, ctx.alloc());
+                byteBuf.writeInt(bytes.capacity());
+                byteBuf.writeBytes(bytes);
+                bytes.release();
             } else {
                 final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
-                byteBuf.writeBytes(textSerializer.serializeResponseAsString(error).getBytes(UTF8));
+                final byte [] bytes = textSerializer.serializeResponseAsString(error).getBytes(CharsetUtil.UTF_8);
+                byteBuf.writeInt(bytes.length);
+                byteBuf.writeBytes(bytes);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 2437e16..cefaec4 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -20,15 +20,14 @@ package org.apache.tinkerpop.gremlin.server;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.TestHelper;
-import org.apache.tinkerpop.gremlin.driver.Client;
-import org.apache.tinkerpop.gremlin.driver.Cluster;
-import org.apache.tinkerpop.gremlin.driver.Result;
-import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.*;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
 import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
@@ -86,6 +85,9 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
                     throw new RuntimeException(ex);
                 }
                 break;
+            case "shouldWorkOverNioTransport":
+                settings.channelizer = NioChannelizer.class.getName();
+                break;
             case "shouldFailWithBadClientSideSerialization":
                 final List<String> custom = Arrays.asList(
                         JsonBuilder.class.getName() + ";" + JsonBuilderGryoSerializer.class.getName(),
@@ -190,6 +192,24 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
+    public void shouldWorkOverNioTransport() throws Exception {
+        final Cluster cluster = Cluster.build().channelizer(Channelizer.NioChannelizer.class.getName()).create();
+        final Client client = cluster.connect();
+
+        final AtomicInteger checked = new AtomicInteger(0);
+        final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+        while (!results.allItemsAvailable()) {
+            assertTrue(results.getAvailableItemCount() < 10);
+            checked.incrementAndGet();
+            Thread.sleep(100);
+        }
+
+        assertTrue(checked.get() > 0);
+        assertEquals(9, results.getAvailableItemCount());
+        cluster.close();
+    }
+
+    @Test
     public void shouldStream() throws Exception {
         final Cluster cluster = Cluster.open();
         final Client client = cluster.connect();


[2/2] incubator-tinkerpop git commit: Merge remote-tracking branch 'origin/master'

Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master'

Conflicts:
	CHANGELOG.asciidoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/7f8495cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/7f8495cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/7f8495cf

Branch: refs/heads/master
Commit: 7f8495cf501157ad4411489af1201173838e76b0
Parents: 1a68f6f 784d657
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Aug 27 09:11:07 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 27 09:11:07 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   7 +
 docs/src/the-traversal.asciidoc                 |  57 +++--
 .../gremlin/process/traversal/Operator.java     |  79 +++---
 .../process/traversal/Parameterizing.java       |  32 +++
 .../gremlin/process/traversal/Path.java         |  19 +-
 .../traversal/dsl/graph/GraphTraversal.java     | 249 ++++++++++++++++---
 .../dsl/graph/GraphTraversalSource.java         |  44 +++-
 .../gremlin/process/traversal/dsl/graph/__.java |  58 ++++-
 .../process/traversal/step/Mutating.java        |   3 +
 .../process/traversal/step/filter/DropStep.java |   4 +
 .../process/traversal/step/map/AddEdgeStep.java | 123 ++++-----
 .../traversal/step/map/AddVertexStartStep.java  |  73 ++++--
 .../traversal/step/map/AddVertexStep.java       |  69 +++--
 .../step/sideEffect/AddPropertyStep.java        | 130 +++++-----
 .../step/sideEffect/SackElementValueStep.java   |  71 ------
 .../step/sideEffect/SackObjectStep.java         |  60 -----
 .../step/sideEffect/SackValueStep.java          |  85 +++++++
 .../process/traversal/step/util/Parameters.java | 118 +++++++++
 .../strategy/decoration/ElementIdStrategy.java  |  44 +---
 .../strategy/decoration/PartitionStrategy.java  |  30 +--
 .../traverser/B_LP_O_S_SE_SL_Traverser.java     |   3 +-
 .../tinkerpop/gremlin/structure/Graph.java      |   2 +-
 .../gremlin/process/traversal/PathTest.java     |  79 ++++++
 .../traversal/dsl/graph/GraphTraversalTest.java |   2 +-
 .../traversal/step/map/AddEdgeStepTest.java     |  10 +-
 .../step/sideEffect/AddPropertyStepTest.java    |   4 +-
 .../sideEffect/SackElementValueStepTest.java    |  45 ----
 .../step/sideEffect/SackObjectStepTest.java     |  50 ----
 .../step/sideEffect/SackValueStepTest.java      |  55 ++++
 .../ElementIdStrategyTraverseTest.java          |  38 ++-
 .../strategy/decoration/EventStrategyTest.java  |  15 +-
 .../PartitionStrategyTraverseTest.java          |  64 ++---
 .../SubgraphStrategyTraverseTest.java           |  20 +-
 .../verification/ReadOnlyStrategyTest.java      |  17 +-
 .../traversal/step/map/GroovyAddEdgeTest.groovy |  26 ++
 .../step/map/GroovyAddVertexTest.groovy         |  29 ++-
 .../step/sideEffect/GroovySackTest.groovy       |  17 +-
 .../process/traversal/step/map/AddEdgeTest.java | 196 +++++++++++++--
 .../traversal/step/map/AddVertexTest.java       | 106 +++++++-
 .../traversal/step/sideEffect/SackTest.java     |  59 ++++-
 .../ElementIdStrategyProcessTest.java           |  29 ++-
 .../decoration/EventStrategyProcessTest.java    |  42 ++--
 .../PartitionStrategyProcessTest.java           |  34 +--
 .../ReadOnlyStrategyProcessTest.java            |  12 +-
 .../tinkergraph/structure/TinkerGraphTest.java  |   8 +-
 45 files changed, 1567 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7f8495cf/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --cc CHANGELOG.asciidoc
index 5d99eb8,19ece09..c6fce38
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@@ -26,7 -26,13 +26,14 @@@ image::https://raw.githubusercontent.co
  TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  
 +* Fixed bugs in the Gremlin Server's NIO protocol both on the server and driver side.
+ * Added `Path.popEquals(Pop,Object)` to check for path equality based on `Pop` (useful for `TraverserRequirement.LABELED_PATH`).
+ * Added `Operator.assign` to allow setting a direct value.
+ * `Operator` is now a `BinaryOperator<Object>` with appropriate typecasting for respective number operators.
+ * Simplified `SackValueStep` so it now supports both `sack(function)` and sack(function).by()`. Deprecated `sack(function,string)`.
+ * Added `Parameters` object to allow for the parameters of a step to be retrieved at runtime via a traversal.
+ * Redesigned (though backwards compatible) `AddEdgeStep`, `AddVertexStep`, and `AddPropertyStep` (and respective `GraphTraversal` API).
+ * Added `GraphTraversalSource.inject()` so users can spawn a traverser with non-graph objects.
  * `GraphStep` can now take a single argument `Collection` which is either elements or element ids (i.e. `g.V([1,2,3])` is supported now).
  * Added `LoopsStep` to make the loop counter accessible within `repeat()`, `until()` and `emit()`.
  * Gephi Plugin no longer requires manual insert of `store` steps to visualize a traversal.