You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ke...@apache.org on 2024/04/04 19:17:41 UTC
(tinkerpop) 01/01: Minimal changes to get gremlin-driver working with integration tests
This is an automated email from the ASF dual-hosted git repository.
kenhuuu pushed a commit to branch java-driver-minimal-http
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 40994aea004c846cc73d51958114f0ddf0c1d132
Author: Ken Hu <10...@users.noreply.github.com>
AuthorDate: Thu Apr 4 12:07:01 2024 -0700
Minimal changes to get gremlin-driver working with integration tests
---
.../apache/tinkerpop/gremlin/driver/Client.java | 107 ++++++-------
.../apache/tinkerpop/gremlin/driver/Cluster.java | 12 +-
.../tinkerpop/gremlin/driver/Connection.java | 17 ++-
.../tinkerpop/gremlin/driver/ConnectionPool.java | 8 +-
.../gremlin/driver/LoadBalancingStrategy.java | 10 +-
.../tinkerpop/gremlin/driver/RequestOptions.java | 6 +-
.../apache/tinkerpop/gremlin/driver/ResultSet.java | 8 +-
.../apache/tinkerpop/gremlin/driver/Settings.java | 10 +-
.../driver/handler/HttpGremlinRequestEncoder.java | 11 +-
.../gremlin/driver/simple/AbstractClient.java | 10 +-
.../gremlin/driver/simple/SimpleClient.java | 14 +-
...{WebSocketClient.java => SimpleHttpClient.java} | 46 +++---
.../gremlin/driver/simple/WebSocketClient.java | 170 ++++++++++-----------
.../tinkerpop/gremlin/driver/ResultSetTest.java | 4 +-
14 files changed, 221 insertions(+), 212 deletions(-)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 303269c5b4..88f935fd20 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -20,11 +20,11 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
@@ -81,11 +81,11 @@ public abstract class Client {
}
/**
- * Makes any initial changes to the builder and returns the constructed {@link RequestMessage}. Implementers
+ * Makes any initial changes to the builder and returns the constructed {@link RequestMessageV4}. Implementers
* may choose to override this message to append data to the request before sending. By default, this method
* will simply return the {@code builder} passed in by the caller.
*/
- public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
+ public RequestMessageV4.Builder buildMessage(final RequestMessageV4.Builder builder) {
return builder;
}
@@ -97,7 +97,7 @@ public abstract class Client {
/**
* Chooses a {@link Connection} to write the message to.
*/
- protected abstract Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException;
+ protected abstract Connection chooseConnection(final RequestMessageV4 msg) throws TimeoutException, ConnectionException;
/**
* Asynchronous close of the {@code Client}.
@@ -352,26 +352,25 @@ public abstract class Client {
// need to call buildMessage() right away to get client specific configurations, that way request specific
// ones can override as needed
- final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
- .add(Tokens.ARGS_GREMLIN, gremlin)
- .add(Tokens.ARGS_BATCH_SIZE, batchSize);
+ final RequestMessageV4.Builder request = buildMessage(RequestMessageV4.build(gremlin))
+ .addChunkSize(batchSize);
// apply settings if they were made available
- options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
- options.getParameters().ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, params));
- options.getAliases().ifPresent(aliases -> request.addArg(Tokens.ARGS_ALIASES, aliases));
+// options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+ options.getParameters().ifPresent(params -> request.addBindings(params));
+ options.getAliases().ifPresent(aliases -> {if (aliases.get("g") != null) request.addG(aliases.get("g")); });
options.getOverrideRequestId().ifPresent(request::overrideRequestId);
- options.getUserAgent().ifPresent(userAgent -> request.addArg(Tokens.ARGS_USER_AGENT, userAgent));
- options.getLanguage().ifPresent(lang -> request.addArg(Tokens.ARGS_LANGUAGE, lang));
- options.getMaterializeProperties().ifPresent(mp -> request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
+// options.getUserAgent().ifPresent(userAgent -> request.addArg(Tokens.ARGS_USER_AGENT, userAgent));
+ options.getLanguage().ifPresent(lang -> request.addLanguage(lang));
+// options.getMaterializeProperties().ifPresent(mp -> request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
return submitAsync(request.create());
}
/**
- * A low-level method that allows the submission of a manually constructed {@link RequestMessage}.
+ * A low-level method that allows the submission of a manually constructed {@link RequestMessageV4}.
*/
- public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
+ public CompletableFuture<ResultSet> submitAsync(final RequestMessageV4 msg) {
if (isClosing()) throw new IllegalStateException("Client is closed");
if (!initialized)
@@ -497,20 +496,20 @@ public abstract class Client {
* from that host's connection pool.
*/
@Override
- protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
+ protected Connection chooseConnection(final RequestMessageV4 msg) throws TimeoutException, ConnectionException {
final Iterator<Host> possibleHosts;
- if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
- // looking at this code about putting the Host on the RequestMessage in light of 3.5.4, not sure
- // this is being used as intended here. server side usage is to place the channel.remoteAddress
- // in this token in the status metadata for the response. can't remember why it is being used this
- // way here exactly. created TINKERPOP-2821 to examine this more carefully to clean this up in a
- // future version.
- final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
- msg.getArgs().remove(Tokens.ARGS_HOST);
- possibleHosts = IteratorUtils.of(host);
- } else {
+// if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
+// // looking at this code about putting the Host on the RequestMessage in light of 3.5.4, not sure
+// // this is being used as intended here. server side usage is to place the channel.remoteAddress
+// // in this token in the status metadata for the response. can't remember why it is being used this
+// // way here exactly. created TINKERPOP-2821 to examine this more carefully to clean this up in a
+// // future version.
+// final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
+// msg.getArgs().remove(Tokens.ARGS_HOST);
+// possibleHosts = IteratorUtils.of(host);
+// } else {
possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
- }
+// }
// try a random host if none are marked available. maybe it will reconnect in the meantime. better than
// going straight to a fast NoHostAvailableException as was the case in versions 3.5.4 and earlier
@@ -652,16 +651,14 @@ public abstract class Client {
try {
// need to call buildMessage() right away to get client specific configurations, that way request specific
// ones can override as needed
- final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_BYTECODE)
- .processor("traversal")
- .addArg(Tokens.ARGS_GREMLIN, bytecode));
+ final RequestMessageV4.Builder request = buildMessage(RequestMessageV4.build(bytecode));
// apply settings if they were made available
- options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
- options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+ options.getBatchSize().ifPresent(batchSize -> request.addChunkSize(batchSize));
+// options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
options.getOverrideRequestId().ifPresent(request::overrideRequestId);
- options.getUserAgent().ifPresent(userAgent -> request.add(Tokens.ARGS_USER_AGENT, userAgent));
- options.getMaterializeProperties().ifPresent(mp -> request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
+// options.getUserAgent().ifPresent(userAgent -> request.add(Tokens.ARGS_USER_AGENT, userAgent));
+// options.getMaterializeProperties().ifPresent(mp -> request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
return submitAsync(request.create());
} catch (RuntimeException re) {
@@ -672,18 +669,20 @@ public abstract class Client {
}
@Override
- public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
- final RequestMessage.Builder builder = RequestMessage.from(msg);
+ public CompletableFuture<ResultSet> submitAsync(final RequestMessageV4 msg) {
+ final RequestMessageV4.Builder builder = RequestMessageV4.from(msg);
// only add aliases which aren't already present. if they are present then they represent request level
// overrides which should be mucked with
- if (!aliases.isEmpty()) {
- final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
- aliases.forEach((k, v) -> {
- if (!original.containsKey(k))
- builder.addArg(Tokens.ARGS_ALIASES, aliases);
- });
- }
+ // TODO: replaced this with ARGS_G as we don't allow a map of aliases anymore.
+// if (!aliases.isEmpty()) {
+// final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
+// aliases.forEach((k, v) -> {
+// if (!original.containsKey(k))
+// builder.addArg(Tokens.ARGS_ALIASES, aliases);
+// });
+// }
+ builder.addG(aliases.get("g"));
return super.submitAsync(builder.create());
}
@@ -704,10 +703,11 @@ public abstract class Client {
}
@Override
- public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
+ public RequestMessageV4.Builder buildMessage(final RequestMessageV4.Builder builder) {
if (close.isDone()) throw new IllegalStateException("Client is closed");
- if (!aliases.isEmpty())
- builder.addArg(Tokens.ARGS_ALIASES, aliases);
+// TODO: aliases not supported. replace with ARG_G.
+// if (!aliases.isEmpty())
+// builder.addArg(Tokens.ARGS_ALIASES, aliases);
return client.buildMessage(builder);
}
@@ -726,7 +726,7 @@ public abstract class Client {
* Delegates to the underlying {@link Client.ClusteredClient}.
*/
@Override
- protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
+ protected Connection chooseConnection(final RequestMessageV4 msg) throws TimeoutException, ConnectionException {
if (close.isDone()) throw new IllegalStateException("Client is closed");
return client.chooseConnection(msg);
}
@@ -783,13 +783,14 @@ public abstract class Client {
}
/**
- * Adds the {@link Tokens#ARGS_SESSION} value to every {@link RequestMessage}.
+ * Adds the {@link Tokens#ARGS_SESSION} value to every {@link RequestMessageV4}.
*/
@Override
- public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
- builder.processor("session");
- builder.addArg(Tokens.ARGS_SESSION, sessionId);
- builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
+ public RequestMessageV4.Builder buildMessage(final RequestMessageV4.Builder builder) {
+// TODO: replace this with new Transaction API later.
+// builder.processor("session");
+// builder.addArg(Tokens.ARGS_SESSION, sessionId);
+// builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
return builder;
}
@@ -797,7 +798,7 @@ public abstract class Client {
* Since the session is bound to a single host, simply borrow a connection from that pool.
*/
@Override
- protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
+ protected Connection chooseConnection(final RequestMessageV4 msg) throws TimeoutException, ConnectionException {
return connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 870bb8bdcc..c5dc54ca60 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -29,7 +29,7 @@ import io.netty.util.concurrent.Future;
import org.apache.commons.configuration2.Configuration;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.ser.Serializers;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -501,7 +501,7 @@ public final class Cluster {
return manager.authProps;
}
- RequestMessage.Builder validationRequest() {
+ RequestMessageV4.Builder validationRequest() {
return manager.validationRequest.get();
}
@@ -597,7 +597,7 @@ public final class Cluster {
private int reconnectInterval = Connection.RECONNECT_INTERVAL;
private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE;
private long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
- private String channelizer = Channelizer.WebSocketChannelizer.class.getName();
+ private String channelizer = Channelizer.HttpChannelizer.class.getName();
private boolean enableSsl = false;
private String keyStore = null;
private String keyStorePassword = null;
@@ -1037,7 +1037,7 @@ public final class Cluster {
public Cluster create() {
if (addresses.size() == 0) addContactPoint("localhost");
- if (null == serializer) serializer = Serializers.GRAPHBINARY_V1.simpleInstance();
+ if (null == serializer) serializer = Serializers.GRAPHBINARY_V4.simpleInstance();
return new Cluster(this);
}
}
@@ -1077,7 +1077,7 @@ public final class Cluster {
private final LoadBalancingStrategy loadBalancingStrategy;
private final AuthProperties authProps;
private final Optional<SslContext> sslContextOptional;
- private final Supplier<RequestMessage.Builder> validationRequest;
+ private final Supplier<RequestMessageV4.Builder> validationRequest;
private final UnaryOperator<FullHttpRequest> interceptor;
/**
@@ -1167,7 +1167,7 @@ public final class Cluster {
this.connectionScheduler = new ScheduledThreadPoolExecutor(contactPoints.size() + 1,
new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
- validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).add(Tokens.ARGS_GREMLIN, builder.validationRequest);
+ validationRequest = () -> RequestMessageV4.build(builder.validationRequest);
}
private void validateBuilder(final Builder builder) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index db79fe065f..c5c76e83b3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -22,7 +22,7 @@ import org.apache.tinkerpop.gremlin.util.Tokens;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
@@ -61,8 +61,8 @@ final class Connection {
private final String creatingThread;
private final String createdTimestamp;
- public static final int MAX_IN_PROCESS = 4;
- public static final int MIN_IN_PROCESS = 1;
+ public static final int MAX_IN_PROCESS = 1;
+ public static final int MIN_IN_PROCESS = 0;
public static final int MAX_WAIT_FOR_CONNECTION = 16000;
public static final int MAX_WAIT_FOR_CLOSE = 3000;
public static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024;
@@ -209,7 +209,7 @@ final class Connection {
return future;
}
- public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) {
+ public ChannelPromise write(final RequestMessageV4 requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) {
// dont allow the same request id to be used as one that is already in the queue
if (pending.containsKey(requestMessage.getRequestId()))
throw new IllegalStateException(String.format("There is already a request pending with an id of: %s", requestMessage.getRequestId()));
@@ -310,10 +310,11 @@ final class Connection {
// the session close message was removed in 3.5.0 after deprecation at 3.3.11. That removal was perhaps
// a bit hasty as session semantics may still require this message in certain cases. Until we can look
// at this in more detail, it seems best to bring back the old functionality to the driver.
- if (client instanceof Client.SessionedClient) {
+ // TODO: commented due to not supporting sessions with HTTP.
+ /*if (client instanceof Client.SessionedClient) {
final boolean forceClose = client.getSettings().getSession().get().isForceClosed();
- final RequestMessage closeMessage = client.buildMessage(
- RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create();
+ final RequestMessageV4 closeMessage = client.buildMessage(
+ RequestMessageV4.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create();
final CompletableFuture<ResultSet> closed = new CompletableFuture<>();
@@ -337,7 +338,7 @@ final class Connection {
((Client.SessionedClient) client).getSessionId());
logger.warn(msg, ex);
}
- }
+ }*/
// take a defensive posture here in the event the channelizer didn't get initialized somehow and a
// close() on the Connection is still called
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index ee31353f4b..78b1c81391 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import java.util.ArrayList;
@@ -51,8 +51,8 @@ final class ConnectionPool {
public static final int MIN_POOL_SIZE = 2;
public static final int MAX_POOL_SIZE = 8;
- public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
- public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 16;
+ public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 0;
+ public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 1;
// A small buffer in millis used for comparing if a connection was created within a certain amount of time.
private static final int CONNECTION_SETUP_TIME_DELTA = 25;
@@ -523,7 +523,7 @@ final class ConnectionPool {
// pool needs it. for now that seems like an unnecessary added bit of complexity for dealing with this
// error state
connection = connectionFactory.create(this);
- final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create();
+ final RequestMessageV4 ping = client.buildMessage(cluster.validationRequest()).create();
final CompletableFuture<ResultSet> f = new CompletableFuture<>();
connection.write(ping, f);
f.get().all().get();
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
index 4455a4ed6e..153ac65fd7 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import java.util.ArrayList;
import java.util.Collection;
@@ -41,13 +41,13 @@ public interface LoadBalancingStrategy extends Host.Listener {
public void initialize(final Cluster cluster, final Collection<Host> hosts);
/**
- * Provide an ordered list of hosts to send the given {@link RequestMessage} to.
+ * Provide an ordered list of hosts to send the given {@link RequestMessageV4} to.
*/
- public Iterator<Host> select(final RequestMessage msg);
+ public Iterator<Host> select(final RequestMessageV4 msg);
/**
* A simple round-robin strategy that simply selects the next host in the {@link Cluster} to send the
- * {@link RequestMessage} to.
+ * {@link RequestMessageV4} to.
*/
public static class RoundRobin implements LoadBalancingStrategy {
@@ -61,7 +61,7 @@ public interface LoadBalancingStrategy extends Host.Listener {
}
@Override
- public Iterator<Host> select(final RequestMessage msg) {
+ public Iterator<Host> select(final RequestMessageV4 msg) {
final List<Host> hosts = new ArrayList<>();
// a host could be marked as dead in which case we dont need to send messages to it - just skip it for
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
index 63d7dad91a..3f66e2fbb0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
@@ -41,7 +41,7 @@ public final class RequestOptions {
private final UUID overrideRequestId;
private final String userAgent;
private final String language;
- private final String materializeProperties;
+// private final String materializeProperties;
private RequestOptions(final Builder builder) {
this.aliases = builder.aliases;
@@ -51,7 +51,7 @@ public final class RequestOptions {
this.overrideRequestId = builder.overrideRequestId;
this.userAgent = builder.userAgent;
this.language = builder.language;
- this.materializeProperties = builder.materializeProperties;
+// this.materializeProperties = builder.materializeProperties;
}
public Optional<UUID> getOverrideRequestId() {
@@ -82,7 +82,7 @@ public final class RequestOptions {
return Optional.ofNullable(language);
}
- public Optional<String> getMaterializeProperties() { return Optional.ofNullable(materializeProperties); }
+// public Optional<String> getMaterializeProperties() { return Optional.ofNullable(materializeProperties); }
public static Builder build() {
return new Builder();
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 4d60b8dfb9..b41e416da0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,13 +53,13 @@ import java.util.stream.StreamSupport;
public final class ResultSet implements Iterable<Result> {
private final ResultQueue resultQueue;
private final ExecutorService executor;
- private final RequestMessage originalRequestMessage;
+ private final RequestMessageV4 originalRequestMessage;
private final Host host;
private final CompletableFuture<Void> readCompleted;
public ResultSet(final ResultQueue resultQueue, final ExecutorService executor,
- final CompletableFuture<Void> readCompleted, final RequestMessage originalRequestMessage,
+ final CompletableFuture<Void> readCompleted, final RequestMessageV4 originalRequestMessage,
final Host host) {
this.executor = executor;
this.host = host;
@@ -68,7 +68,7 @@ public final class ResultSet implements Iterable<Result> {
this.originalRequestMessage = originalRequestMessage;
}
- public RequestMessage getOriginalRequestMessage() {
+ public RequestMessageV4 getOriginalRequestMessage() {
return originalRequestMessage;
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 0845dbf547..9c0a72ceb6 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -20,7 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.commons.configuration2.Configuration;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.TypeDescription;
@@ -396,9 +396,9 @@ final class Settings {
/**
* The constructor for the channel that connects to the server. This value should be the fully qualified
* class name of a Gremlin Driver {@link Channelizer} implementation. By default this value is set to
- * {@link Channelizer.WebSocketChannelizer}.
+ * {@link Channelizer.HttpChannelizer}.
*/
- public String channelizer = Channelizer.WebSocketChannelizer.class.getName();
+ public String channelizer = Channelizer.HttpChannelizer.class.getName();
/**
* A valid Gremlin script that can be used to test remote operations.
@@ -417,9 +417,9 @@ final class Settings {
/**
* The fully qualified class name of the {@link MessageSerializer} that will be used to communicate with the
* server. Note that the serializer configured on the client should be supported by the server configuration.
- * By default the setting is configured to {@link GraphBinaryMessageSerializerV1}.
+ * By default the setting is configured to {@link GraphBinaryMessageSerializerV4}.
*/
- public String className = GraphBinaryMessageSerializerV1.class.getCanonicalName();
+ public String className = GraphBinaryMessageSerializerV4.class.getCanonicalName();
/**
* The configuration for the specified serializer with the {@link #className}.
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index 67f7e3d95c..06377e31be 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -31,18 +31,19 @@ import org.apache.tinkerpop.gremlin.driver.UserAgent;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4;
import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
import java.util.List;
import java.util.function.UnaryOperator;
/**
- * Converts {@link RequestMessage} to a {@code HttpRequest}.
+ * Converts {@link RequestMessageV4} to a {@code HttpRequest}.
*/
@ChannelHandler.Sharable
-public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<RequestMessage> {
+public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<RequestMessageV4> {
private final MessageSerializer<?> serializer;
private final boolean userAgentEnabled;
private final UnaryOperator<FullHttpRequest> interceptor;
@@ -61,7 +62,7 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req
}
@Override
- protected void encode(final ChannelHandlerContext channelHandlerContext, final RequestMessage requestMessage, final List<Object> objects) throws Exception {
+ protected void encode(final ChannelHandlerContext channelHandlerContext, final RequestMessageV4 requestMessage, final List<Object> objects) throws Exception {
final String mimeType = serializer.mimeTypesSupported()[0];
// only GraphSON3 and GraphBinary recommended for serialization of Bytecode requests
if (requestMessage.getArg("gremlin") instanceof Bytecode &&
@@ -73,7 +74,7 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req
}
try {
- final ByteBuf buffer = serializer.serializeRequestAsBinary(requestMessage, channelHandlerContext.alloc());
+ final ByteBuf buffer = ((MessageTextSerializerV4)serializer).serializeRequestMessageV4(requestMessage, channelHandlerContext.alloc());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", buffer);
request.headers().add(HttpHeaderNames.CONTENT_TYPE, mimeType);
request.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index 00a67b42eb..3574cd2c91 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -23,7 +23,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
@@ -45,16 +45,16 @@ public abstract class AbstractClient implements SimpleClient {
group = new NioEventLoopGroup(1, threadFactory);
}
- public abstract void writeAndFlush(final RequestMessage requestMessage) throws Exception;
+ public abstract void writeAndFlush(final RequestMessageV4 requestMessage) throws Exception;
@Override
- public void submit(final RequestMessage requestMessage, final Consumer<ResponseMessage> callback) throws Exception {
+ public void submit(final RequestMessageV4 requestMessage, final Consumer<ResponseMessage> callback) throws Exception {
callbackResponseHandler.callback = callback;
writeAndFlush(requestMessage);
}
@Override
- public List<ResponseMessage> submit(final RequestMessage requestMessage) throws Exception {
+ public List<ResponseMessage> submit(final RequestMessageV4 requestMessage) throws Exception {
// this is just a test client to force certain behaviors of the server. hanging tests are a pain to deal with
// especially in travis as it's not always clear where the hang is. a few reasonable timeouts might help
// make debugging easier when we look at logs
@@ -62,7 +62,7 @@ public abstract class AbstractClient implements SimpleClient {
}
@Override
- public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage requestMessage) throws Exception {
+ public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessageV4 requestMessage) throws Exception {
final List<ResponseMessage> results = new ArrayList<>();
final CompletableFuture<List<ResponseMessage>> f = new CompletableFuture<>();
callbackResponseHandler.callback = response -> {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java
index 43ace8237e..c6edb20d33 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java
@@ -19,7 +19,7 @@
package org.apache.tinkerpop.gremlin.driver.simple;
import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import java.io.Closeable;
@@ -33,20 +33,20 @@ import java.util.function.Consumer;
public interface SimpleClient extends Closeable {
public default void submit(final String gremlin, final Consumer<ResponseMessage> callback) throws Exception {
- submit(RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, gremlin).create(), callback);
+ submit(RequestMessageV4.build(gremlin).create(), callback);
}
- public void submit(final RequestMessage requestMessage, final Consumer<ResponseMessage> callback) throws Exception;
+ public void submit(final RequestMessageV4 requestMessage, final Consumer<ResponseMessage> callback) throws Exception;
public default List<ResponseMessage> submit(final String gremlin) throws Exception {
- return submit(RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, gremlin).create());
+ return submit(RequestMessageV4.build(gremlin).create());
}
- public List<ResponseMessage> submit(final RequestMessage requestMessage) throws Exception;
+ public List<ResponseMessage> submit(final RequestMessageV4 requestMessage) throws Exception;
public default CompletableFuture<List<ResponseMessage>> submitAsync(final String gremlin) throws Exception {
- return submitAsync(RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, gremlin).create());
+ return submitAsync(RequestMessageV4.build(gremlin).create());
}
- public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage requestMessage) throws Exception;
+ public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessageV4 requestMessage) throws Exception;
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
similarity index 74%
copy from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
copy to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
index a5de7fb82e..fd97fa3d6e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
@@ -24,11 +24,14 @@ import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.tinkerpop.gremlin.driver.HandshakeInterceptor;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -39,7 +42,7 @@ import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,28 +56,28 @@ import java.util.concurrent.TimeUnit;
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class WebSocketClient extends AbstractClient {
- private static final Logger logger = LoggerFactory.getLogger(WebSocketClient.class);
+public class SimpleHttpClient extends AbstractClient {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleHttpClient.class);
private final Channel channel;
- public WebSocketClient() {
- this(URI.create("ws://localhost:8182/gremlin"));
+ public SimpleHttpClient() {
+ this(URI.create("http://localhost:8182/gremlin"));
}
- public WebSocketClient(final URI uri) {
- super("ws-client-%d");
+ public SimpleHttpClient(final URI uri) {
+ super("simple-http-client-%d");
final Bootstrap b = new Bootstrap().group(group);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
final String protocol = uri.getScheme();
- if (!"ws".equalsIgnoreCase(protocol) && !"wss".equalsIgnoreCase(protocol))
+ if (!"http".equalsIgnoreCase(protocol) && !"https".equalsIgnoreCase(protocol))
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
final String host = uri.getHost();
final int port;
if (uri.getPort() == -1) {
- if ("ws".equalsIgnoreCase(protocol)) {
+ if ("http".equalsIgnoreCase(protocol)) {
port = 80;
- } else if ("wss".equalsIgnoreCase(protocol)) {
+ } else if ("https".equalsIgnoreCase(protocol)) {
port = 443;
} else {
port = -1;
@@ -84,7 +87,7 @@ public class WebSocketClient extends AbstractClient {
}
try {
- final boolean ssl = "wss".equalsIgnoreCase(protocol);
+ final boolean ssl = "https".equalsIgnoreCase(protocol);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient()
@@ -92,9 +95,10 @@ public class WebSocketClient extends AbstractClient {
} else {
sslCtx = null;
}
- final WebSocketClientHandler wsHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
- uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
- final MessageSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV1();
+
+// final WebSocketClientHandler wsHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
+// uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
+ final MessageSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV4();
b.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
@@ -106,22 +110,24 @@ public class WebSocketClient extends AbstractClient {
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(65536),
- wsHandler,
- new WebSocketGremlinRequestEncoder(true, serializer),
- new WebSocketGremlinResponseDecoder(serializer),
+// wsHandler,
+// new WebSocketGremlinRequestEncoder(true, serializer),
+// new WebSocketGremlinResponseDecoder(serializer),
+ new HttpGremlinRequestEncoder(serializer, HandshakeInterceptor.NO_OP, false),
+ new HttpGremlinResponseDecoder(serializer),
callbackResponseHandler);
}
});
channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
- wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
+// wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
- public void writeAndFlush(final RequestMessage requestMessage) throws Exception {
+ public void writeAndFlush(final RequestMessageV4 requestMessage) throws Exception {
channel.writeAndFlush(requestMessage);
}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
index a5de7fb82e..76cc274efb 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
@@ -53,88 +53,88 @@ import java.util.concurrent.TimeUnit;
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class WebSocketClient extends AbstractClient {
- private static final Logger logger = LoggerFactory.getLogger(WebSocketClient.class);
- private final Channel channel;
-
- public WebSocketClient() {
- this(URI.create("ws://localhost:8182/gremlin"));
- }
-
- public WebSocketClient(final URI uri) {
- super("ws-client-%d");
- final Bootstrap b = new Bootstrap().group(group);
- b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
- final String protocol = uri.getScheme();
- if (!"ws".equalsIgnoreCase(protocol) && !"wss".equalsIgnoreCase(protocol))
- throw new IllegalArgumentException("Unsupported protocol: " + protocol);
- final String host = uri.getHost();
- final int port;
- if (uri.getPort() == -1) {
- if ("ws".equalsIgnoreCase(protocol)) {
- port = 80;
- } else if ("wss".equalsIgnoreCase(protocol)) {
- port = 443;
- } else {
- port = -1;
- }
- } else {
- port = uri.getPort();
- }
-
- try {
- final boolean ssl = "wss".equalsIgnoreCase(protocol);
- final SslContext sslCtx;
- if (ssl) {
- sslCtx = SslContextBuilder.forClient()
- .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
- } else {
- sslCtx = null;
- }
- final WebSocketClientHandler wsHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
- uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
- final MessageSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV1();
- b.channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(final SocketChannel ch) {
- final ChannelPipeline p = ch.pipeline();
- if (sslCtx != null) {
- p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
- }
- p.addLast(
- new HttpClientCodec(),
- new HttpObjectAggregator(65536),
- wsHandler,
- new WebSocketGremlinRequestEncoder(true, serializer),
- new WebSocketGremlinResponseDecoder(serializer),
- callbackResponseHandler);
- }
- });
-
- channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
- wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void writeAndFlush(final RequestMessage requestMessage) throws Exception {
- channel.writeAndFlush(requestMessage);
- }
-
- @Override
- public void close() throws IOException {
- try {
- channel.close().get(30, TimeUnit.SECONDS);
- } catch (Exception ex) {
- logger.error("Failure closing simple WebSocketClient", ex);
- } finally {
- if (!group.shutdownGracefully().awaitUninterruptibly(30, TimeUnit.SECONDS)) {
- logger.error("Could not cleanly shutdown thread pool on WebSocketClient");
- }
- }
- }
-}
+//public class WebSocketClient extends AbstractClient {
+// private static final Logger logger = LoggerFactory.getLogger(WebSocketClient.class);
+// private final Channel channel;
+//
+// public WebSocketClient() {
+// this(URI.create("ws://localhost:8182/gremlin"));
+// }
+//
+// public WebSocketClient(final URI uri) {
+// super("ws-client-%d");
+// final Bootstrap b = new Bootstrap().group(group);
+// b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+//
+// final String protocol = uri.getScheme();
+// if (!"ws".equalsIgnoreCase(protocol) && !"wss".equalsIgnoreCase(protocol))
+// throw new IllegalArgumentException("Unsupported protocol: " + protocol);
+// final String host = uri.getHost();
+// final int port;
+// if (uri.getPort() == -1) {
+// if ("ws".equalsIgnoreCase(protocol)) {
+// port = 80;
+// } else if ("wss".equalsIgnoreCase(protocol)) {
+// port = 443;
+// } else {
+// port = -1;
+// }
+// } else {
+// port = uri.getPort();
+// }
+//
+// try {
+// final boolean ssl = "wss".equalsIgnoreCase(protocol);
+// final SslContext sslCtx;
+// if (ssl) {
+// sslCtx = SslContextBuilder.forClient()
+// .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+// } else {
+// sslCtx = null;
+// }
+// final WebSocketClientHandler wsHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
+// uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
+// final MessageSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV1();
+// b.channel(NioSocketChannel.class)
+// .handler(new ChannelInitializer<SocketChannel>() {
+// @Override
+// protected void initChannel(final SocketChannel ch) {
+// final ChannelPipeline p = ch.pipeline();
+// if (sslCtx != null) {
+// p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
+// }
+// p.addLast(
+// new HttpClientCodec(),
+// new HttpObjectAggregator(65536),
+// wsHandler,
+// new WebSocketGremlinRequestEncoder(true, serializer),
+// new WebSocketGremlinResponseDecoder(serializer),
+// callbackResponseHandler);
+// }
+// });
+//
+// channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
+// wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
+// } catch (Exception ex) {
+// throw new RuntimeException(ex);
+// }
+// }
+//
+// @Override
+// public void writeAndFlush(final RequestMessage requestMessage) throws Exception {
+// channel.writeAndFlush(requestMessage);
+// }
+//
+// @Override
+// public void close() throws IOException {
+// try {
+// channel.close().get(30, TimeUnit.SECONDS);
+// } catch (Exception ex) {
+// logger.error("Failure closing simple WebSocketClient", ex);
+// } finally {
+// if (!group.shutdownGracefully().awaitUninterruptibly(30, TimeUnit.SECONDS)) {
+// logger.error("Could not cleanly shutdown thread pool on WebSocketClient");
+// }
+// }
+// }
+//}
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index 50e8769a2b..1a301dfa33 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +51,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
@Before
public void setupThis() {
- resultSet = new ResultSet(resultQueue, pool, readCompleted, RequestMessage.build("traversal").create(), null);
+ resultSet = new ResultSet(resultQueue, pool, readCompleted, RequestMessageV4.build("traversal").create(), null);
}
@Test