You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/08/27 15:11:36 UTC
[1/2] incubator-tinkerpop git commit: Fixed bugs in NIO protocol for
Gremlin Server.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 784d657af -> 7f8495cf5
Fixed bugs in NIO protocol for Gremlin Server.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1a68f6f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1a68f6f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1a68f6f9
Branch: refs/heads/master
Commit: 1a68f6f9fe2bae274a88bf637510465575f71d1f
Parents: 95ff755
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Aug 27 09:07:21 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 27 09:07:21 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 1 +
.../tinkerpop/gremlin/driver/Channelizer.java | 10 +---
.../tinkerpop/gremlin/driver/Cluster.java | 17 ++++++
.../tinkerpop/gremlin/driver/Connection.java | 14 +++--
.../tinkerpop/gremlin/driver/Settings.java | 1 +
.../handler/NioGremlinRequestEncoder.java | 14 ++++-
.../handler/NioGremlinResponseDecoder.java | 25 ++++++--
.../driver/util/ProfilingApplication.java | 3 +
.../gremlin/server/channel/NioChannelizer.java | 4 +-
.../handler/NioGremlinBinaryRequestDecoder.java | 62 +++++++++++---------
.../handler/NioGremlinResponseEncoder.java | 26 +++++---
.../server/GremlinDriverIntegrateTest.java | 28 +++++++--
12 files changed, 144 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index be25cf3..5d99eb8 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+* Fixed bugs in the Gremlin Server's NIO protocol both on the server and driver side.
* `GraphStep` can now take a single argument `Collection` which is either elements or element ids (i.e. `g.V([1,2,3])` is supported now).
* Added `LoopsStep` to make the loop counter accessible within `repeat()`, `until()` and `emit()`.
* Gephi Plugin no longer requires manual insert of `store` steps to visualize a traversal.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index ccf0f9b..1b77ea4 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -35,7 +35,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,7 +128,7 @@ public interface Channelizer extends ChannelHandler {
/**
* WebSocket {@link Channelizer} implementation.
*/
- final class WebSocketChannelizer extends AbstractChannelizer {
+ public final class WebSocketChannelizer extends AbstractChannelizer {
private WebSocketClientHandler handler;
private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
@@ -182,19 +181,16 @@ public interface Channelizer extends ChannelHandler {
/**
* NIO {@link Channelizer} implementation.
*/
- final class NioChannelizer extends AbstractChannelizer {
- private NioGremlinRequestEncoder nioGremlinRequestEncoder;
-
+ public final class NioChannelizer extends AbstractChannelizer {
@Override
public void init(final Connection connection) {
super.init(connection);
- nioGremlinRequestEncoder = new NioGremlinRequestEncoder(true, cluster.getSerializer());
}
@Override
public void configure(ChannelPipeline pipeline) {
pipeline.addLast("gremlin-decoder", new NioGremlinResponseDecoder(cluster.getSerializer()));
- pipeline.addLast("gremlin-encoder", nioGremlinRequestEncoder);
+ pipeline.addLast("gremlin-encoder", new NioGremlinRequestEncoder(true, cluster.getSerializer()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index a732ad5..11de54e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -230,6 +230,7 @@ public final class Cluster {
private int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
private int reconnectInterval = Connection.RECONNECT_INTERVAL;
private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
+ private String channelizer = Channelizer.WebSocketChannelizer.class.getName();
private boolean enableSsl = false;
private LoadBalancingStrategy loadBalancingStrategy = new LoadBalancingStrategy.RoundRobin();
private AuthProperties authProps = new AuthProperties();
@@ -387,6 +388,21 @@ public final class Cluster {
}
/**
+ * Specify the {@link Channelizer} implementation to use on the client when creating a {@link Connection}.
+ */
+ public Builder channelizer(final String channelizerClass) {
+ this.channelizer = channelizerClass;
+ return this;
+ }
+
+ /**
+ * Specify the {@link Channelizer} implementation to use on the client when creating a {@link Connection}.
+ */
+ public Builder channelizer(final Class channelizerClass) {
+ return channelizer(channelizerClass.getCanonicalName());
+ }
+
+ /**
* Time in milliseconds to wait before attempting to reconnect to a dead host after it has been marked dead.
*/
public Builder reconnectIntialDelay(final int initialDelay) {
@@ -466,6 +482,7 @@ public final class Cluster {
connectionPoolSettings.reconnectInterval = this.reconnectInterval;
connectionPoolSettings.resultIterationBatchSize = this.resultIterationBatchSize;
connectionPoolSettings.enableSsl = this.enableSsl;
+ connectionPoolSettings.channelizer = this.channelizer;
return new Cluster(getContactPoints(), serializer, this.nioPoolSize, this.workerPoolSize,
connectionPoolSettings, loadBalancingStrategy, authProps);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 0f2f62a..3a76019 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -65,6 +65,8 @@ final class Connection {
* busy a particular {@code Connection} is.
*/
public final AtomicInteger borrowed = new AtomicInteger(0);
+ private final AtomicReference<Class<Channelizer>> channelizerClass = new AtomicReference<>(null);
+
private volatile boolean isDead = false;
private final int maxInProcess;
@@ -83,11 +85,15 @@ final class Connection {
if (cluster.isClosing()) throw new IllegalStateException("Cannot open a connection while the cluster after close() is called");
final Bootstrap b = this.cluster.getFactory().createBootstrap();
- final Channelizer channelizer = new Channelizer.WebSocketChannelizer();
- channelizer.init(this);
- b.channel(NioSocketChannel.class).handler(channelizer);
-
try {
+ if (channelizerClass.get() == null) {
+ channelizerClass.compareAndSet(null, (Class<Channelizer>) Class.forName(cluster.connectionPoolSettings().channelizer));
+ }
+
+ final Channelizer channelizer = channelizerClass.get().newInstance();
+ channelizer.init(this);
+ b.channel(NioSocketChannel.class).handler(channelizer);
+
channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
channelizer.connected();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 0e91979..07f3294 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -83,6 +83,7 @@ final class Settings {
public int reconnectInterval = Connection.RECONNECT_INTERVAL;
public int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
public int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
+ public String channelizer = Channelizer.WebSocketChannelizer.class.getName();
public String sessionId = null;
public Optional<String> optionalSessionId() {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
index f80e195..4a7a436 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinRequestEncoder.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-@ChannelHandler.Sharable
public final class NioGremlinRequestEncoder extends MessageToByteEncoder<Object> {
private static final Logger logger = LoggerFactory.getLogger(WebSocketGremlinRequestEncoder.class);
private boolean binaryEncoding = false;
@@ -49,10 +48,19 @@ public final class NioGremlinRequestEncoder extends MessageToByteEncoder<Object>
final RequestMessage requestMessage = (RequestMessage) msg;
try {
if (binaryEncoding) {
- byteBuf.writeBytes(serializer.serializeRequestAsBinary(requestMessage, channelHandlerContext.alloc()));
+ // wrap the serialized message/payload inside of a "frame". this works around the problem where
+ // the length of the payload is not encoded into the general protocol. that length isn't needed
+ // for websockets because under that protocol, the message is wrapped in a "websocket frame". this
+ // is not the optimal way to deal with this really, but it does prevent a protocol change in this
+ // immediate moment trying to get the NioChannelizer working.
+ final ByteBuf bytes = serializer.serializeRequestAsBinary(requestMessage, channelHandlerContext.alloc());
+ byteBuf.writeInt(bytes.capacity());
+ byteBuf.writeBytes(bytes);
} else {
final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
- byteBuf.writeBytes(textSerializer.serializeRequestAsString(requestMessage).getBytes(CharsetUtil.UTF_8));
+ final byte [] bytes = textSerializer.serializeRequestAsString(requestMessage).getBytes(CharsetUtil.UTF_8);
+ byteBuf.writeInt(bytes.length);
+ byteBuf.writeBytes(bytes);
}
} catch (Exception ex) {
logger.warn(String.format("An error occurred during serialization of this request [%s] - it could not be sent to the server.", requestMessage), ex);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
index 6f4a44d..d61b809 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/NioGremlinResponseDecoder.java
@@ -18,26 +18,43 @@
*/
package org.apache.tinkerpop.gremlin.driver.handler;
+import io.netty.handler.codec.ReplayingDecoder;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public final class NioGremlinResponseDecoder extends ByteToMessageDecoder {
+public final class NioGremlinResponseDecoder extends ReplayingDecoder<NioGremlinResponseDecoder.DecoderState> {
private final MessageSerializer serializer;
+ private int messageLength;
public NioGremlinResponseDecoder(final MessageSerializer serializer) {
+ super(DecoderState.MESSAGE_LENGTH);
this.serializer = serializer;
}
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List<Object> objects) throws Exception {
- if (byteBuf.readableBytes() < 1) return;
- objects.add(serializer.deserializeResponse(byteBuf));
+ switch (state()) {
+ case MESSAGE_LENGTH:
+ messageLength = byteBuf.readInt();
+ checkpoint(DecoderState.MESSAGE);
+ case MESSAGE:
+ final ByteBuf messageFrame = byteBuf.readBytes(messageLength);
+ objects.add(serializer.deserializeResponse(messageFrame));
+ checkpoint(DecoderState.MESSAGE_LENGTH);
+ break;
+ default:
+ throw new Error("Invalid message received from Gremlin Server");
+ }
+ }
+
+ public enum DecoderState {
+ MESSAGE_LENGTH,
+ MESSAGE
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index 000ac29..5e640c9 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
@@ -129,6 +130,7 @@ public class ProfilingApplication {
final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", "3000").toString());
final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", "2").toString());
final int tooSlowThreshold = Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString());
+ final String channelizer = options.getOrDefault("channelizer", Channelizer.WebSocketChannelizer.class.getName()).toString();
final String serializer = options.getOrDefault("serializer", Serializers.GRYO_V1D0.name()).toString();
final String script = options.getOrDefault("script", "1+1").toString();
@@ -141,6 +143,7 @@ public class ProfilingApplication {
.minInProcessPerConnection(minInProcessPerConnection)
.maxInProcessPerConnection(maxInProcessPerConnection)
.nioPoolSize(nioPoolSize)
+ .channelizer(channelizer)
.maxWaitForConnection(maxWaitForConnection)
.serializer(Serializers.valueOf(serializer))
.workerPoolSize(workerPoolSize).create();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
index fba75f9..ec0eeaf 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/channel/NioChannelizer.java
@@ -40,13 +40,11 @@ import org.slf4j.LoggerFactory;
public class NioChannelizer extends AbstractChannelizer {
private static final Logger logger = LoggerFactory.getLogger(NioChannelizer.class);
- private NioGremlinBinaryRequestDecoder nioGremlinBinaryRequestDecoder;
private SaslAuthenticationHandler authenticationHandler;
@Override
public void init(final ServerGremlinExecutor<EventLoopGroup> serverGremlinExecutor) {
super.init(serverGremlinExecutor);
- nioGremlinBinaryRequestDecoder = new NioGremlinBinaryRequestDecoder(serializers);
// configure authentication - null means don't bother to add authentication to the pipeline
if (authenticator != null)
@@ -60,7 +58,7 @@ public class NioChannelizer extends AbstractChannelizer {
pipeline.addLast(new LoggingHandler("log-io", LogLevel.DEBUG));
pipeline.addLast("response-encoder", new NioGremlinResponseEncoder());
- pipeline.addLast("request-binary-decoder", nioGremlinBinaryRequestDecoder);
+ pipeline.addLast("request-binary-decoder", new NioGremlinBinaryRequestDecoder(serializers));
if (logger.isDebugEnabled())
pipeline.addLast(new LoggingHandler("log-codec", LogLevel.DEBUG));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
index 72b88a7..c6996a4 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinBinaryRequestDecoder.java
@@ -18,60 +18,63 @@
*/
package org.apache.tinkerpop.gremlin.server.handler;
+import io.netty.handler.codec.ReplayingDecoder;
+import io.netty.util.CharsetUtil;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class NioGremlinBinaryRequestDecoder extends ByteToMessageDecoder {
- private static final Logger logger = LoggerFactory.getLogger(WsGremlinBinaryRequestDecoder.class);
+public class NioGremlinBinaryRequestDecoder extends ReplayingDecoder<NioGremlinBinaryRequestDecoder.DecoderState> {
+ private static final Logger logger = LoggerFactory.getLogger(NioGremlinBinaryRequestDecoder.class);
- private static final Charset UTF8 = Charset.forName("UTF-8");
private final Map<String, MessageSerializer> serializers;
+ private int messageLength;
public NioGremlinBinaryRequestDecoder(final Map<String, MessageSerializer> serializers) {
+ super(DecoderState.MESSAGE_LENGTH);
this.serializers = serializers;
}
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List<Object> objects) throws Exception {
- if (byteBuf.readableBytes() < 1) {
- return;
- }
+ switch (state()) {
+ case MESSAGE_LENGTH:
+ messageLength = byteBuf.readInt();
+ checkpoint(DecoderState.MESSAGE);
+ case MESSAGE:
+ try {
+ final ByteBuf messageFrame = byteBuf.readBytes(messageLength);
+ final int contentTypeLength = messageFrame.readByte();
+ final ByteBuf contentTypeFrame = messageFrame.readBytes(contentTypeLength);
+ final String contentType = contentTypeFrame.toString(CharsetUtil.UTF_8);
- final byte lenOfContentType = byteBuf.readByte();
- if (byteBuf.readableBytes() < lenOfContentType) {
- byteBuf.resetReaderIndex();
- return;
- }
+ final MessageSerializer serializer = select(contentType, Serializers.DEFAULT_REQUEST_SERIALIZER);
+ channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
+ channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);
- final ByteBuf contentTypeBytes = channelHandlerContext.alloc().buffer(lenOfContentType);
- try {
- byteBuf.readBytes(contentTypeBytes);
- final String contentType = contentTypeBytes.toString(UTF8);
- final MessageSerializer serializer = select(contentType, Serializers.DEFAULT_REQUEST_SERIALIZER);
+ // subtract the contentTypeLength and the byte that held it from the full message length to
+ // figure out how long the rest of the message is
+ final int payloadLength = messageLength - 1 - contentTypeLength;
+ objects.add(serializer.deserializeRequest(messageFrame.readBytes(payloadLength)));
+ } catch (SerializationException se) {
+ objects.add(RequestMessage.INVALID);
+ }
- channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer);
- channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(true);
- try {
- objects.add(serializer.deserializeRequest(byteBuf.discardReadBytes()));
- } catch (SerializationException se) {
- objects.add(RequestMessage.INVALID);
- }
- } finally {
- contentTypeBytes.release();
+ checkpoint(DecoderState.MESSAGE_LENGTH);
+ break;
+ default:
+ throw new Error("Invalid message sent to Gremlin Server");
}
}
@@ -82,4 +85,9 @@ public class NioGremlinBinaryRequestDecoder extends ByteToMessageDecoder {
return serializers.getOrDefault(mimeType, defaultSerializer);
}
+
+ public enum DecoderState {
+ MESSAGE_LENGTH,
+ MESSAGE
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
index 99a4167..e62703f 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/NioGremlinResponseEncoder.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.server.handler;
import com.codahale.metrics.Meter;
+import io.netty.util.CharsetUtil;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
@@ -32,8 +33,6 @@ import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.Charset;
-
import static com.codahale.metrics.MetricRegistry.name;
/**
@@ -43,7 +42,6 @@ import static com.codahale.metrics.MetricRegistry.name;
public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMessage> {
private static final Logger logger = LoggerFactory.getLogger(NioGremlinResponseEncoder.class);
static final Meter errorMeter = MetricManager.INSTANCE.getMeter(name(GremlinServer.class, "errors"));
- private static final Charset UTF8 = Charset.forName("UTF-8");
@Override
protected void encode(final ChannelHandlerContext ctx, final ResponseMessage responseMessage, final ByteBuf byteBuf) throws Exception {
@@ -54,13 +52,18 @@ public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMess
if (!responseMessage.getStatus().getCode().isSuccess())
errorMeter.mark();
- if (useBinary)
- byteBuf.writeBytes(serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()));
- else {
+ if (useBinary) {
+ final ByteBuf bytes = serializer.serializeResponseAsBinary(responseMessage, ctx.alloc());
+ byteBuf.writeInt(bytes.capacity());
+ byteBuf.writeBytes(bytes);
+ bytes.release();
+ } else {
// the expectation is that the GremlinTextRequestDecoder will have placed a MessageTextSerializer
// instance on the channel.
final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
- byteBuf.writeBytes(textSerializer.serializeResponseAsString(responseMessage).getBytes(UTF8));
+ final byte [] bytes = textSerializer.serializeResponseAsString(responseMessage).getBytes(CharsetUtil.UTF_8);
+ byteBuf.writeInt(bytes.length);
+ byteBuf.writeBytes(bytes);
}
} catch (Exception ex) {
errorMeter.mark();
@@ -71,10 +74,15 @@ public class NioGremlinResponseEncoder extends MessageToByteEncoder<ResponseMess
.statusMessage(errorMessage)
.code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
if (useBinary) {
- byteBuf.writeBytes(serializer.serializeResponseAsBinary(error, ctx.alloc()));
+ final ByteBuf bytes = serializer.serializeResponseAsBinary(error, ctx.alloc());
+ byteBuf.writeInt(bytes.capacity());
+ byteBuf.writeBytes(bytes);
+ bytes.release();
} else {
final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
- byteBuf.writeBytes(textSerializer.serializeResponseAsString(error).getBytes(UTF8));
+ final byte [] bytes = textSerializer.serializeResponseAsString(error).getBytes(CharsetUtil.UTF_8);
+ byteBuf.writeInt(bytes.length);
+ byteBuf.writeBytes(bytes);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1a68f6f9/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 2437e16..cefaec4 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -20,15 +20,14 @@ package org.apache.tinkerpop.gremlin.server;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.TestHelper;
-import org.apache.tinkerpop.gremlin.driver.Client;
-import org.apache.tinkerpop.gremlin.driver.Cluster;
-import org.apache.tinkerpop.gremlin.driver.Result;
-import org.apache.tinkerpop.gremlin.driver.ResultSet;
+import org.apache.tinkerpop.gremlin.driver.*;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
@@ -86,6 +85,9 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
throw new RuntimeException(ex);
}
break;
+ case "shouldWorkOverNioTransport":
+ settings.channelizer = NioChannelizer.class.getName();
+ break;
case "shouldFailWithBadClientSideSerialization":
final List<String> custom = Arrays.asList(
JsonBuilder.class.getName() + ";" + JsonBuilderGryoSerializer.class.getName(),
@@ -190,6 +192,24 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
}
@Test
+ public void shouldWorkOverNioTransport() throws Exception {
+ final Cluster cluster = Cluster.build().channelizer(Channelizer.NioChannelizer.class.getName()).create();
+ final Client client = cluster.connect();
+
+ final AtomicInteger checked = new AtomicInteger(0);
+ final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
+ while (!results.allItemsAvailable()) {
+ assertTrue(results.getAvailableItemCount() < 10);
+ checked.incrementAndGet();
+ Thread.sleep(100);
+ }
+
+ assertTrue(checked.get() > 0);
+ assertEquals(9, results.getAvailableItemCount());
+ cluster.close();
+ }
+
+ @Test
public void shouldStream() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
[2/2] incubator-tinkerpop git commit: Merge remote-tracking branch
'origin/master'
Posted by sp...@apache.org.
Merge remote-tracking branch 'origin/master'
Conflicts:
CHANGELOG.asciidoc
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/7f8495cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/7f8495cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/7f8495cf
Branch: refs/heads/master
Commit: 7f8495cf501157ad4411489af1201173838e76b0
Parents: 1a68f6f 784d657
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Aug 27 09:11:07 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Aug 27 09:11:07 2015 -0400
----------------------------------------------------------------------
CHANGELOG.asciidoc | 7 +
docs/src/the-traversal.asciidoc | 57 +++--
.../gremlin/process/traversal/Operator.java | 79 +++---
.../process/traversal/Parameterizing.java | 32 +++
.../gremlin/process/traversal/Path.java | 19 +-
.../traversal/dsl/graph/GraphTraversal.java | 249 ++++++++++++++++---
.../dsl/graph/GraphTraversalSource.java | 44 +++-
.../gremlin/process/traversal/dsl/graph/__.java | 58 ++++-
.../process/traversal/step/Mutating.java | 3 +
.../process/traversal/step/filter/DropStep.java | 4 +
.../process/traversal/step/map/AddEdgeStep.java | 123 ++++-----
.../traversal/step/map/AddVertexStartStep.java | 73 ++++--
.../traversal/step/map/AddVertexStep.java | 69 +++--
.../step/sideEffect/AddPropertyStep.java | 130 +++++-----
.../step/sideEffect/SackElementValueStep.java | 71 ------
.../step/sideEffect/SackObjectStep.java | 60 -----
.../step/sideEffect/SackValueStep.java | 85 +++++++
.../process/traversal/step/util/Parameters.java | 118 +++++++++
.../strategy/decoration/ElementIdStrategy.java | 44 +---
.../strategy/decoration/PartitionStrategy.java | 30 +--
.../traverser/B_LP_O_S_SE_SL_Traverser.java | 3 +-
.../tinkerpop/gremlin/structure/Graph.java | 2 +-
.../gremlin/process/traversal/PathTest.java | 79 ++++++
.../traversal/dsl/graph/GraphTraversalTest.java | 2 +-
.../traversal/step/map/AddEdgeStepTest.java | 10 +-
.../step/sideEffect/AddPropertyStepTest.java | 4 +-
.../sideEffect/SackElementValueStepTest.java | 45 ----
.../step/sideEffect/SackObjectStepTest.java | 50 ----
.../step/sideEffect/SackValueStepTest.java | 55 ++++
.../ElementIdStrategyTraverseTest.java | 38 ++-
.../strategy/decoration/EventStrategyTest.java | 15 +-
.../PartitionStrategyTraverseTest.java | 64 ++---
.../SubgraphStrategyTraverseTest.java | 20 +-
.../verification/ReadOnlyStrategyTest.java | 17 +-
.../traversal/step/map/GroovyAddEdgeTest.groovy | 26 ++
.../step/map/GroovyAddVertexTest.groovy | 29 ++-
.../step/sideEffect/GroovySackTest.groovy | 17 +-
.../process/traversal/step/map/AddEdgeTest.java | 196 +++++++++++++--
.../traversal/step/map/AddVertexTest.java | 106 +++++++-
.../traversal/step/sideEffect/SackTest.java | 59 ++++-
.../ElementIdStrategyProcessTest.java | 29 ++-
.../decoration/EventStrategyProcessTest.java | 42 ++--
.../PartitionStrategyProcessTest.java | 34 +--
.../ReadOnlyStrategyProcessTest.java | 12 +-
.../tinkergraph/structure/TinkerGraphTest.java | 8 +-
45 files changed, 1567 insertions(+), 750 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7f8495cf/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --cc CHANGELOG.asciidoc
index 5d99eb8,19ece09..c6fce38
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@@ -26,7 -26,13 +26,14 @@@ image::https://raw.githubusercontent.co
TinkerPop 3.1.0 (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+* Fixed bugs in the Gremlin Server's NIO protocol both on the server and driver side.
+ * Added `Path.popEquals(Pop,Object)` to check for path equality based on `Pop` (useful for `TraverserRequirement.LABELED_PATH`).
+ * Added `Operator.assign` to allow setting a direct value.
+ * `Operator` is now a `BinaryOperator<Object>` with appropriate typecasting for respective number operators.
+ * Simplified `SackValueStep` so it now supports both `sack(function)` and sack(function).by()`. Deprecated `sack(function,string)`.
+ * Added `Parameters` object to allow for the parameters of a step to be retrieved at runtime via a traversal.
+ * Redesigned (though backwards compatible) `AddEdgeStep`, `AddVertexStep`, and `AddPropertyStep` (and respective `GraphTraversal` API).
+ * Added `GraphTraversalSource.inject()` so users can spawn a traverser with non-graph objects.
* `GraphStep` can now take a single argument `Collection` which is either elements or element ids (i.e. `g.V([1,2,3])` is supported now).
* Added `LoopsStep` to make the loop counter accessible within `repeat()`, `until()` and `emit()`.
* Gephi Plugin no longer requires manual insert of `store` steps to visualize a traversal.