You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by va...@apache.org on 2024/04/08 21:04:23 UTC
(tinkerpop) 01/01: PoC
This is an automated email from the ASF dual-hosted git repository.
valentyn pushed a commit to branch valentyn/http-java-driver
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit af665f98178a4ad05835edfa08a0600bf092d129
Author: Valentyn Kahamlyk <Va...@improving.com>
AuthorDate: Mon Apr 8 14:04:12 2024 -0700
PoC
---
.../tinkerpop/gremlin/driver/Channelizer.java | 32 +--
.../apache/tinkerpop/gremlin/driver/Handler.java | 282 ---------------------
.../tinkerpop/gremlin/driver/ResultQueue.java | 9 +-
.../driver/handler/GremlinResponseHandler.java | 135 ++++++++++
.../driver/handler/HttpGremlinRequestEncoder.java | 11 +
.../HttpGremlinResponseDebugStreamDecoder.java | 40 +++
.../driver/handler/HttpGremlinResponseDecoder.java | 1 +
.../handler/HttpGremlinResponseStreamDecoder.java | 92 +++++++
.../gremlin/driver/simple/SimpleHttpClient.java | 22 +-
.../gremlin/server/HttpDriverIntegrateTest.java | 61 +----
gremlin-server/src/test/resources/logback-test.xml | 2 +-
11 files changed, 325 insertions(+), 362 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index a626e0160b..fae760178d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -18,28 +18,29 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
-import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
-
+import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
+import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,8 +145,7 @@ public interface Channelizer extends ChannelHandler {
}
configure(pipeline);
- pipeline.addLast(PIPELINE_GREMLIN_SASL_HANDLER, new Handler.GremlinSaslAuthenticationHandler(cluster.authProperties()));
- pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
+ pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending));
}
}
@@ -250,15 +250,15 @@ public interface Channelizer extends ChannelHandler {
/**
* Sends requests over the HTTP endpoint. Client functionality is governed by the limitations of the HTTP endpoint,
* meaning that sessions are not available and as such {@code tx()} (i.e. transactions) are not available over this
- * channelizer. Only sessionless requests are possible. Some driver configuration options may not be relevant when
- * using HTTP, such as {@link Tokens#ARGS_BATCH_SIZE} since HTTP does not stream results back in that fashion.
+ * channelizer. Only sessionless requests are possible.
*/
public final class HttpChannelizer extends AbstractChannelizer {
private HttpClientCodec handler;
private HttpGremlinRequestEncoder gremlinRequestEncoder;
- private HttpGremlinResponseDecoder gremlinResponseDecoder;
+ // private HttpGremlinResponseDecoder gremlinResponseDecoder;
+ private HttpGremlinResponseStreamDecoder gremlinResponseDecoder;
@Override
public void init(final Connection connection) {
@@ -269,7 +269,7 @@ public interface Channelizer extends ChannelHandler {
throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", HttpChannelizer.class.getSimpleName()));
gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptor(), cluster.isUserAgentOnConnectEnabled());
- gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer());
+ gremlinResponseDecoder = new HttpGremlinResponseStreamDecoder((MessageChunkSerializer<?>) cluster.getSerializer());
}
@Override
@@ -296,7 +296,7 @@ public interface Channelizer extends ChannelHandler {
handler = new HttpClientCodec();
pipeline.addLast("http-codec", handler);
- pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
+ // pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
pipeline.addLast("gremlin-encoder", gremlinRequestEncoder);
pipeline.addLast("gremlin-decoder", gremlinResponseDecoder);
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
deleted file mode 100644
index c76a20e6a7..0000000000
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver;
-
-import io.netty.util.AttributeMap;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
-import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
-import io.netty.util.ReferenceCountUtil;
-import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.security.PrivilegedActionException;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-/**
- * Holder for internal handler classes used in constructing the channel pipeline.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-final class Handler {
-
- /**
- * Generic SASL handler that will authenticate against the gremlin server.
- */
- static class GremlinSaslAuthenticationHandler extends SimpleChannelInboundHandler<ResponseMessage> implements CallbackHandler {
- private static final Logger logger = LoggerFactory.getLogger(GremlinSaslAuthenticationHandler.class);
- private static final AttributeKey<Subject> subjectKey = AttributeKey.valueOf("subject");
- private static final AttributeKey<SaslClient> saslClientKey = AttributeKey.valueOf("saslclient");
- private static final Map<String, String> SASL_PROPERTIES = new HashMap<String, String>() {{ put(Sasl.SERVER_AUTH, "true"); }};
- private static final byte[] NULL_CHALLENGE = new byte[0];
-
- private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
- private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
-
- private final AuthProperties authProps;
-
- public GremlinSaslAuthenticationHandler(final AuthProperties authProps) {
- this.authProps = authProps;
- }
-
- @Override
- protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
- // We are only interested in AUTHENTICATE responses here. Everything else can
- // get passed down the pipeline
- if (response.getStatus().getCode() == ResponseStatusCode.AUTHENTICATE) {
- final Attribute<SaslClient> saslClient = ((AttributeMap) channelHandlerContext).attr(saslClientKey);
- final Attribute<Subject> subject = ((AttributeMap) channelHandlerContext).attr(subjectKey);
- final RequestMessage.Builder messageBuilder = RequestMessage.build(Tokens.OPS_AUTHENTICATION);
- // First time through we don't have a sasl client
- if (saslClient.get() == null) {
- subject.set(login());
- try {
- saslClient.set(saslClient(getHostName(channelHandlerContext)));
- } catch (SaslException saslException) {
- // push the sasl error into a failure response from the server. this ensures that standard
- // processing for the ResultQueue is kept. without this SaslException trap and subsequent
- // conversion to an authentication failure, the close() of the connection might not
- // succeed as it will appear as though pending messages remain present in the queue on the
- // connection and the shutdown won't proceed
- final ResponseMessage clientSideError = ResponseMessage.build(response.getRequestId())
- .code(ResponseStatusCode.FORBIDDEN).statusMessage(saslException.getMessage()).create();
- channelHandlerContext.fireChannelRead(clientSideError);
- return;
- }
-
- messageBuilder.addArg(Tokens.ARGS_SASL_MECHANISM, getMechanism());
- messageBuilder.addArg(Tokens.ARGS_SASL, saslClient.get().hasInitialResponse() ?
- BASE64_ENCODER.encodeToString(evaluateChallenge(subject, saslClient, NULL_CHALLENGE)) : null);
- } else {
- // the server sends base64 encoded sasl as well as the byte array. the byte array will eventually be
- // phased out, but is present now for backward compatibility in 3.2.x
- final String base64sasl = response.getStatus().getAttributes().containsKey(Tokens.ARGS_SASL) ?
- response.getStatus().getAttributes().get(Tokens.ARGS_SASL).toString() :
- BASE64_ENCODER.encodeToString((byte[]) response.getResult().getData());
-
- messageBuilder.addArg(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString(
- evaluateChallenge(subject, saslClient, BASE64_DECODER.decode(base64sasl))));
- }
- channelHandlerContext.writeAndFlush(messageBuilder.create());
- } else {
- // SimpleChannelInboundHandler will release the frame if we don't retain it explicitly.
- ReferenceCountUtil.retain(response);
- channelHandlerContext.fireChannelRead(response);
- }
- }
-
- public void handle(final Callback[] callbacks) {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- if (authProps.get(AuthProperties.Property.USERNAME) != null) {
- ((NameCallback)callback).setName(authProps.get(AuthProperties.Property.USERNAME));
- }
- } else if (callback instanceof PasswordCallback) {
- if (authProps.get(AuthProperties.Property.PASSWORD) != null) {
- ((PasswordCallback)callback).setPassword(authProps.get(AuthProperties.Property.PASSWORD).toCharArray());
- }
- } else {
- logger.warn("SASL handler got a callback of type " + callback.getClass().getCanonicalName());
- }
- }
- }
-
- private byte[] evaluateChallenge(final Attribute<Subject> subject, final Attribute<SaslClient> saslClient,
- final byte[] challenge) throws SaslException {
-
- if (subject.get() == null) {
- return saslClient.get().evaluateChallenge(challenge);
- } else {
- // If we have a subject then run this as a privileged action using the subject
- try {
- return Subject.doAs(subject.get(), (PrivilegedExceptionAction<byte[]>) () -> saslClient.get().evaluateChallenge(challenge));
- } catch (PrivilegedActionException e) {
- throw (SaslException)e.getException();
- }
- }
- }
-
- private Subject login() throws LoginException {
- // Login if the user provided us with an entry into the JAAS config file
- if (authProps.get(AuthProperties.Property.JAAS_ENTRY) != null) {
- final LoginContext login = new LoginContext(authProps.get(AuthProperties.Property.JAAS_ENTRY));
- login.login();
- return login.getSubject();
- }
- return null;
- }
-
- private SaslClient saslClient(final String hostname) throws SaslException {
- return Sasl.createSaslClient(new String[] { getMechanism() }, null, authProps.get(AuthProperties.Property.PROTOCOL),
- hostname, SASL_PROPERTIES, this);
- }
-
- private String getHostName(final ChannelHandlerContext channelHandlerContext) {
- return ((InetSocketAddress)channelHandlerContext.channel().remoteAddress()).getAddress().getHostName();
- }
-
- /**
- * Work out the Sasl mechanism based on the user supplied parameters.
- * If we have a username and password use PLAIN otherwise GSSAPI
- * ToDo: have gremlin-server provide the mechanism(s) it is configured with, so that additional mechanisms can
- * be supported in the driver and confusing GSSException messages from the driver are avoided
- */
- private String getMechanism() {
- if ((authProps.get(AuthProperties.Property.USERNAME) != null) &&
- (authProps.get(AuthProperties.Property.PASSWORD) != null)) {
- return "PLAIN";
- } else {
- return "GSSAPI";
- }
- }
- }
-
- /**
- * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request
- * as the {@link ResponseMessage} objects are deserialized.
- */
- static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
- private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class);
- private final ConcurrentMap<UUID, ResultQueue> pending;
-
- public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) {
- this.pending = pending;
- }
-
- @Override
- public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
- // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server
- // should fire off a close message which will properly release the driver.
- super.channelInactive(ctx);
-
- // the channel isn't going to get anymore results as it is closed so release all pending requests
- pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active")));
- pending.clear();
- }
-
- @Override
- protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
- final ResponseStatusCode statusCode = response.getStatus().getCode();
- final ResultQueue queue = pending.get(response.getRequestId());
- if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
- final Object data = response.getResult().getData();
-
- // this is a "result" from the server which is either the result of a script or a
- // serialized traversal
- if (data instanceof List) {
- // unrolls the collection into individual results to be handled by the queue.
- final List<Object> listToUnroll = (List<Object>) data;
- listToUnroll.forEach(item -> queue.add(new Result(item)));
- } else {
- // since this is not a list it can just be added to the queue
- queue.add(new Result(response.getResult().getData()));
- }
- } else {
- // this is a "success" but represents no results otherwise it is an error
- if (statusCode != ResponseStatusCode.NO_CONTENT) {
- final Map<String,Object> attributes = response.getStatus().getAttributes();
- final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
- (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
- final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
- (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
- queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
- exceptions, stackTrace, cleanStatusAttributes(attributes)));
- }
- }
-
- // as this is a non-PARTIAL_CONTENT code - the stream is done.
- if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
- pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes());
- }
- }
-
- @Override
- public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
- // if this happens enough times (like the client is unable to deserialize a response) the pending
- // messages queue will not clear. wonder if there is some way to cope with that. of course, if
- // there are that many failures someone would take notice and hopefully stop the client.
- logger.error("Could not process the response", cause);
-
- // the channel took an error because of something pretty bad so release all the futures out there
- pending.values().forEach(val -> val.markError(cause));
- pending.clear();
-
- // serialization exceptions should not close the channel - that's worth a retry
- if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException))
- if (ctx.channel().isActive()) ctx.close();
- }
-
- private Map<String,Object> cleanStatusAttributes(final Map<String,Object> statusAttributes) {
- final Map<String,Object> m = new HashMap<>();
- statusAttributes.forEach((k,v) -> {
- if (!k.equals(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) && !k.equals(Tokens.STATUS_ATTRIBUTE_STACK_TRACE))
- m.put(k,v);
- });
- return m;
- }
- }
-
-}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 8ce4fba21c..c4e827503d 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
+import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.javatuples.Pair;
@@ -33,13 +34,13 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
/**
- * A queue of incoming {@link Result} objects. The queue is updated by the {@link Handler.GremlinResponseHandler}
+ * A queue of incoming {@link Result} objects. The queue is updated by the {@link GremlinResponseHandler}
* until a response terminator is identified.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-final class ResultQueue {
+public final class ResultQueue {
private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
@@ -104,7 +105,7 @@ final class ResultQueue {
resultLinkedBlockingQueue.drainTo(collection);
}
- void markComplete(final Map<String,Object> statusAttributes) {
+ public void markComplete(final Map<String, Object> statusAttributes) {
// if there was some aggregation performed in the queue then the full object is hanging out waiting to be
// added to the ResultSet
if (aggregatedResult != null)
@@ -117,7 +118,7 @@ final class ResultQueue {
this.drainAllWaiting();
}
- void markError(final Throwable throwable) {
+ public void markError(final Throwable throwable) {
error.set(throwable);
this.readComplete.completeExceptionally(throwable);
this.drainAllWaiting();
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
new file mode 100644
index 0000000000..daa9384b1c
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.AttributeMap;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultQueue;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder.REQUEST_ID;
+
+/**
+ * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request
+ * as the {@link ResponseMessage} objects are deserialized.
+ */
+public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
+ private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class);
+ private final ConcurrentMap<UUID, ResultQueue> pending;
+
+ public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) {
+ this.pending = pending;
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server
+ // should fire off a close message which will properly release the driver.
+ super.channelInactive(ctx);
+
+ // the channel isn't going to get anymore results as it is closed so release all pending requests
+ pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active")));
+ pending.clear();
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
+ final UUID requestId = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID).get();
+
+ final ResponseStatusCode statusCode = response.getStatus() == null ? ResponseStatusCode.PARTIAL_CONTENT : response.getStatus().getCode();
+ final ResultQueue queue = pending.get(requestId);
+ System.out.println("Handler.GremlinResponseHandler get requestId: " + requestId);
+ if (response.getResult().getData() != null) {
+ System.out.println("Handler.GremlinResponseHandler payload size: " + ((List) response.getResult().getData()).size());
+ }
+
+ if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) {
+ final Object data = response.getResult().getData();
+
+ // this is a "result" from the server which is either the result of a script or a
+ // serialized traversal
+ if (data instanceof List) {
+ // unrolls the collection into individual results to be handled by the queue.
+ final List<Object> listToUnroll = (List<Object>) data;
+ listToUnroll.forEach(item -> queue.add(new Result(item)));
+ } else {
+ // since this is not a list it can just be added to the queue
+ queue.add(new Result(response.getResult().getData()));
+ }
+ } else {
+ // this is a "success" but represents no results otherwise it is an error
+ if (statusCode != ResponseStatusCode.NO_CONTENT) {
+ final Map<String, Object> attributes = response.getStatus().getAttributes();
+ final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ?
+ (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null;
+ final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ?
+ (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null;
+ queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(),
+ exceptions, stackTrace, cleanStatusAttributes(attributes)));
+ }
+ }
+
+ // todo:
+ // as this is a non-PARTIAL_CONTENT code - the stream is done.
+ if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) {
+ pending.remove(requestId).markComplete(response.getStatus().getAttributes());
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+ // if this happens enough times (like the client is unable to deserialize a response) the pending
+ // messages queue will not clear. wonder if there is some way to cope with that. of course, if
+ // there are that many failures someone would take notice and hopefully stop the client.
+ logger.error("Could not process the response", cause);
+
+ // the channel took an error because of something pretty bad so release all the futures out there
+ pending.values().forEach(val -> val.markError(cause));
+ pending.clear();
+
+ // serialization exceptions should not close the channel - that's worth a retry
+ if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException))
+ if (ctx.channel().isActive()) ctx.close();
+ }
+
+ private Map<String, Object> cleanStatusAttributes(final Map<String, Object> statusAttributes) {
+ final Map<String, Object> m = new HashMap<>();
+ statusAttributes.forEach((k, v) -> {
+ if (!k.equals(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) && !k.equals(Tokens.STATUS_ATTRIBUTE_STACK_TRACE))
+ m.put(k, v);
+ });
+ return m;
+ }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index 06377e31be..fc220c6819 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -27,6 +27,9 @@ import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.AttributeMap;
import org.apache.tinkerpop.gremlin.driver.UserAgent;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
@@ -37,6 +40,7 @@ import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4;
import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
import java.util.List;
+import java.util.UUID;
import java.util.function.UnaryOperator;
/**
@@ -44,6 +48,9 @@ import java.util.function.UnaryOperator;
*/
@ChannelHandler.Sharable
public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<RequestMessageV4> {
+
+ //todo: move
+ public static final AttributeKey<UUID> REQUEST_ID = AttributeKey.valueOf("requestId");
private final MessageSerializer<?> serializer;
private final boolean userAgentEnabled;
private final UnaryOperator<FullHttpRequest> interceptor;
@@ -63,6 +70,10 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req
@Override
protected void encode(final ChannelHandlerContext channelHandlerContext, final RequestMessageV4 requestMessage, final List<Object> objects) throws Exception {
+ final Attribute<UUID> requestIdAttribute = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID);
+ requestIdAttribute.set(requestMessage.getRequestId());
+ System.out.println("HttpGremlinRequestEncoder set requestId: " + requestIdAttribute.get());
+
final String mimeType = serializer.mimeTypesSupported()[0];
// only GraphSON3 and GraphBinary recommended for serialization of Bytecode requests
if (requestMessage.getArg("gremlin") instanceof Bytecode &&
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java
new file mode 100644
index 0000000000..35dc49ef30
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+
+import java.util.List;
+
+/**
+ * Converts {@code HttpResponse} to a {@link ResponseMessage}.
+ */
+@ChannelHandler.Sharable
+public final class HttpGremlinResponseDebugStreamDecoder extends MessageToMessageDecoder<ResponseMessage> {
+ public HttpGremlinResponseDebugStreamDecoder() {}
+
+ @Override
+ protected void decode(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response, final List<Object> objects) throws Exception {
+ System.out.println("HttpGremlinResponseStreamDecoder: ");
+ System.out.println(response.getResult());
+ }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
index 9b46db27ee..4d7d452239 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
@@ -37,6 +37,7 @@ import java.util.UUID;
/**
* Converts {@code HttpResponse} to a {@link ResponseMessage}.
+ * Can be used for GraphSON
*/
@ChannelHandler.Sharable
public final class HttpGremlinResponseDecoder extends MessageToMessageDecoder<FullHttpResponse> {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
new file mode 100644
index 0000000000..1306f7fff0
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpObject;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.AttributeMap;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer;
+import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+
+import java.util.List;
+import java.util.Objects;
+
+public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<DefaultHttpObject> {
+
+ // todo: move out
+ public static final AttributeKey<Boolean> IS_FIRST_CHUNK = AttributeKey.valueOf("isFirstChunk");
+
+ private final MessageChunkSerializer<?> serializer;
+
+ public HttpGremlinResponseStreamDecoder(MessageChunkSerializer<?> serializer) {
+
+ this.serializer = serializer;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, DefaultHttpObject msg, List<Object> out) throws Exception {
+ final Attribute<Boolean> isFirstChunk = ((AttributeMap) ctx).attr(IS_FIRST_CHUNK);
+
+ if (msg instanceof DefaultHttpResponse) {
+ isFirstChunk.set(true);
+ }
+
+ if (msg instanceof DefaultHttpContent) {
+ try {
+ if (ctx.attr(IS_FIRST_CHUNK).get()) {
+ System.out.println("first chunk");
+ } else {
+ System.out.println("not first chunk");
+ }
+
+ final ResponseMessage chunk = serializer.readChunk(((DefaultHttpContent) msg).content(), isFirstChunk.get());
+
+ if (chunk.getResult().getData() != null) {
+ System.out.println("payload size: " + ((List) chunk.getResult().getData()).size());
+ }
+
+ if (msg instanceof DefaultLastHttpContent) {
+ final HttpHeaders trailingHeaders = ((DefaultLastHttpContent) msg).trailingHeaders();
+
+ System.out.println("final chunk, trailing headers:");
+ System.out.println(trailingHeaders);
+
+ if (!Objects.equals(trailingHeaders.get("code"), "200")) {
+ throw new Exception(trailingHeaders.get("message"));
+ }
+ }
+
+ isFirstChunk.set(false);
+
+ out.add(chunk);
+ } catch (SerializationException e) {
+ System.out.println("Ex: " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
index fd97fa3d6e..8a37dfacb5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
@@ -20,17 +20,13 @@ package org.apache.tinkerpop.gremlin.driver.simple;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption;
-import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.apache.tinkerpop.gremlin.driver.HandshakeInterceptor;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
-import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDebugStreamDecoder;
import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -39,11 +35,9 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper;
+import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +92,7 @@ public class SimpleHttpClient extends AbstractClient {
// final WebSocketClientHandler wsHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
// uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
- final MessageSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV4();
+ final MessageChunkSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV4();
b.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
@@ -109,12 +103,16 @@ public class SimpleHttpClient extends AbstractClient {
}
p.addLast(
new HttpClientCodec(),
- new HttpObjectAggregator(65536),
+ new HttpGremlinResponseStreamDecoder(serializer),
+// new HttpObjectAggregator(65536),
+
// wsHandler,
// new WebSocketGremlinRequestEncoder(true, serializer),
// new WebSocketGremlinResponseDecoder(serializer),
new HttpGremlinRequestEncoder(serializer, HandshakeInterceptor.NO_OP, false),
- new HttpGremlinResponseDecoder(serializer),
+ // new HttpGremlinResponseDecoder(serializer),
+ new HttpGremlinResponseDebugStreamDecoder(),
+
callbackResponseHandler);
}
});
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
index 2d08b7722f..6c036da7e5 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
@@ -30,6 +30,8 @@ import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.util.ser.Serializers;
import org.junit.Test;
+import java.util.List;
+
import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.AllOf.allOf;
@@ -49,7 +51,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
public void shouldSubmitScriptWithGraphSON() throws Exception {
final Cluster cluster = TestClientFactory.build()
.channelizer(Channelizer.HttpChannelizer.class)
- .serializer(Serializers.GRAPHSON_V3)
+ .serializer(Serializers.GRAPHSON_V4)
.create();
try {
final Client client = cluster.connect();
@@ -65,11 +67,12 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
public void shouldSubmitScriptWithGraphBinary() throws Exception {
final Cluster cluster = TestClientFactory.build()
.channelizer(Channelizer.HttpChannelizer.class)
- .serializer(Serializers.GRAPHBINARY_V1)
+ .serializer(Serializers.GRAPHBINARY_V4)
.create();
try {
final Client client = cluster.connect();
- assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+ // default chunk size is 64
+ assertEquals(100, client.submit("new int[100]").all().get().size());
} catch (Exception ex) {
throw ex;
} finally {
@@ -81,7 +84,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
public void shouldSubmitBytecodeWithGraphSON() throws Exception {
final Cluster cluster = TestClientFactory.build()
.channelizer(Channelizer.HttpChannelizer.class)
- .serializer(Serializers.GRAPHSON_V3)
+ .serializer(Serializers.GRAPHSON_V4)
.create();
try {
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
@@ -98,7 +101,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
public void shouldGetErrorForBytecodeWithUntypedGraphSON() throws Exception {
final Cluster cluster = TestClientFactory.build()
.channelizer(Channelizer.HttpChannelizer.class)
- .serializer(Serializers.GRAPHSON_V2_UNTYPED)
+ .serializer(Serializers.GRAPHSON_V4_UNTYPED)
.create();
try {
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
@@ -116,7 +119,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
public void shouldSubmitBytecodeWithGraphBinary() throws Exception {
final Cluster cluster = TestClientFactory.build()
.channelizer(Channelizer.HttpChannelizer.class)
- .serializer(Serializers.GRAPHBINARY_V1)
+ .serializer(Serializers.GRAPHBINARY_V4)
.create();
try {
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
@@ -129,44 +132,6 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
}
}
- @Test
- public void shouldSubmitMultipleRequestsOverSingleConnection() throws Exception {
- final Cluster cluster = TestClientFactory.build()
- .channelizer(Channelizer.HttpChannelizer.class)
- .minConnectionPoolSize(1).maxConnectionPoolSize(1)
- .serializer(Serializers.GRAPHBINARY_V1)
- .create();
- try {
- for (int ix = 0; ix < 100; ix++) {
- final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
- assertEquals(ix, g.inject(ix).toList().get(0).intValue());
- }
- } catch (Exception ex) {
- throw ex;
- } finally {
- cluster.close();
- }
- }
-
- @Test
- public void shouldSubmitMultipleRequestsOverMultiConnection() throws Exception {
- final Cluster cluster = TestClientFactory.build()
- .channelizer(Channelizer.HttpChannelizer.class)
- .minConnectionPoolSize(1).maxConnectionPoolSize(8)
- .serializer(Serializers.GRAPHBINARY_V1)
- .create();
- try {
- for (int ix = 0; ix < 100; ix++) {
- final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster));
- assertEquals(ix, g.inject(ix).toList().get(0).intValue());
- }
- } catch (Exception ex) {
- throw ex;
- } finally {
- cluster.close();
- }
- }
-
@Test
public void shouldFailToUseSession() throws Exception {
final Cluster cluster = TestClientFactory.build()
@@ -205,28 +170,30 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes
}
}
+ // !!!
@Test
public void shouldDeserializeErrorWithGraphBinary() throws Exception {
final Cluster cluster = TestClientFactory.build()
.channelizer(Channelizer.HttpChannelizer.class)
- .serializer(Serializers.GRAPHBINARY_V1)
+ .serializer(Serializers.GRAPHBINARY_V4)
.create();
try {
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist"));
g.V().next();
fail("Expected exception to be thrown.");
} catch (Exception ex) {
- assert ex.getMessage().contains("Could not rebind");
+ assert ex.getMessage().contains("The traversal source [doesNotExist] for alias [g] is not configured on the server.");
} finally {
cluster.close();
}
}
+ // !!!
@Test
public void shouldDeserializeErrorWithGraphSON() throws Exception {
final Cluster cluster = TestClientFactory.build()
.channelizer(Channelizer.HttpChannelizer.class)
- .serializer(Serializers.GRAPHSON_V3)
+ .serializer(Serializers.GRAPHSON_V4)
.create();
try {
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist"));
diff --git a/gremlin-server/src/test/resources/logback-test.xml b/gremlin-server/src/test/resources/logback-test.xml
index 354e97cb98..647f9674e9 100644
--- a/gremlin-server/src/test/resources/logback-test.xml
+++ b/gremlin-server/src/test/resources/logback-test.xml
@@ -25,7 +25,7 @@ limitations under the License.
<logger name="org.apache.tinkerpop.gremlin.server.AbstractChannelizer" level="ERROR"/>
<!-- this logger is noisy and we don't assert anything and the error is already tracked on the server so we can
trim the logs a bit with this. -->
- <logger name="org.apache.tinkerpop.gremlin.driver.Handler$GremlinResponseHandler" level="OFF"/>
+ <logger name="org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler" level="OFF"/>
<root level="WARN">
<appender-ref ref="stdout"/>
</root>