You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/04/20 18:07:24 UTC

[tinkerpop] branch TINKERPOP-2547 created (now 0de7194)

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a change to branch TINKERPOP-2547
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git.


      at 0de7194  TINKERPOP-2547 Add a way to supply a callback on requests with java driver

This branch includes the following new commits:

     new 0de7194  TINKERPOP-2547 Add a way to supply a callback on requests with java driver

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[tinkerpop] 01/01: TINKERPOP-2547 Add a way to supply a callback on requests with java driver

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2547
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 0de719441eb434698a9188830041eb7229d0566c
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Tue Apr 20 14:05:55 2021 -0400

    TINKERPOP-2547 Add a way to supply a callback on requests with java driver
---
 CHANGELOG.asciidoc                                 |  1 +
 .../tinkerpop/gremlin/driver/Channelizer.java      |  4 ++
 .../apache/tinkerpop/gremlin/driver/Cluster.java   | 17 ++++++++-
 .../gremlin/driver/RequestInterceptor.java         | 37 ++++++++++++++++++
 .../driver/handler/RequestInterceptorHandler.java  | 44 ++++++++++++++++++++++
 .../gremlin/server/GremlinDriverIntegrateTest.java | 37 ++++++++++++++++++
 6 files changed, 139 insertions(+), 1 deletion(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index b5c5e21..03fafa0 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -37,6 +37,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 * Ensured that `barrier()` additions by strategies were controlled solely by `LazyBarrierStrategy`.
 * Fixed `NullPointerException` in `ResponseMessage` deserialization for GraphSON.
 * Enabled the Gremlin.Net driver to repair its connection pool after the server was temporarily unavailable.
+* Added the ability to supply a `RequestInterceptor` to a `Cluster` which will provide access to outgoing request objects.
 
 [[release-3-4-10]]
 === TinkerPop 3.4.10 (Release Date: January 18, 2021)
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index f4f1eed..5ed896b 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
+import org.apache.tinkerpop.gremlin.driver.handler.RequestInterceptorHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
@@ -168,12 +169,14 @@ public interface Channelizer extends ChannelHandler {
 
         private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
         private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder;
+        private RequestInterceptorHandler requestInterceptorHandler;
 
         @Override
         public void init(final Connection connection) {
             super.init(connection);
             webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer());
             webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
+            requestInterceptorHandler = new RequestInterceptorHandler(cluster.getRequestInterceptor());
         }
 
         /**
@@ -237,6 +240,7 @@ public interface Channelizer extends ChannelHandler {
             final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS));
 
             pipeline.addLast("http-codec", new HttpClientCodec());
+            pipeline.addLast("request-interceptor", requestInterceptorHandler);
             pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
             // Add compression extension for WebSocket defined in https://tools.ietf.org/html/rfc7692
             pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 9e862d6..e1188df 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -28,7 +28,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.slf4j.Logger;
@@ -483,6 +482,10 @@ public final class Cluster {
         return manager.serializer;
     }
 
+    RequestInterceptor getRequestInterceptor() {
+        return manager.interceptor;
+    }
+
     ScheduledExecutorService executor() {
         return manager.executor;
     }
@@ -621,6 +624,7 @@ public final class Cluster {
         private boolean sslSkipCertValidation = false;
         private SslContext sslContext = null;
         private LoadBalancingStrategy loadBalancingStrategy = new LoadBalancingStrategy.RoundRobin();
+        private RequestInterceptor interceptor = RequestInterceptor.NO_OP;
         private AuthProperties authProps = new AuthProperties();
         private long connectionSetupTimeoutMillis = Connection.CONNECTION_SETUP_TIMEOUT_MILLIS;
 
@@ -994,6 +998,15 @@ public final class Cluster {
         }
 
         /**
+         * Specifies an {@link RequestInterceptor} that will allow manipulation of the
+         * {@code FullHttpRequest} prior to its being sent over the websocket.
+         */
+        public Builder requestInterceptor(final RequestInterceptor interceptor) {
+            this.interceptor = interceptor;
+            return this;
+        }
+
+        /**
          * Specifies parameters for authentication to Gremlin Server.
          */
         public Builder authProperties(final AuthProperties authProps) {
@@ -1114,6 +1127,7 @@ public final class Cluster {
         private final AuthProperties authProps;
         private final Optional<SslContext> sslContextOptional;
         private final Supplier<RequestMessage.Builder> validationRequest;
+        private final RequestInterceptor interceptor;
 
         private final ScheduledThreadPoolExecutor executor;
 
@@ -1132,6 +1146,7 @@ public final class Cluster {
             this.loadBalancingStrategy = builder.loadBalancingStrategy;
             this.authProps = builder.authProps;
             this.contactPoints = builder.getContactPoints();
+            this.interceptor = builder.interceptor;
 
             connectionPoolSettings = new Settings.ConnectionPoolSettings();
             connectionPoolSettings.maxInProcessPerConnection = builder.maxInProcessPerConnection;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
new file mode 100644
index 0000000..5924f75
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.tinkerpop.gremlin.driver.handler.RequestInterceptorHandler;
+
+import java.util.function.UnaryOperator;
+
+/**
+ * This function is called when the {@link RequestInterceptorHandler} encounters an object passing through it.
+ * Implementers will need to detect the type of object to determine if they will interact with it or not. Typically,
+ * objects will be a {@code FullHttpRequest} in the case of a websocket handshake or some form of
+ * {@code WebSocketFrame}. Implementations are supplied to {@link Cluster.Builder#requestInterceptor(RequestInterceptor)}.
+ */
+public interface RequestInterceptor extends UnaryOperator<Object> {
+
+    /**
+     * The default implementation of a {@link RequestInterceptor} and behaves as a no-op.
+     */
+    public static final RequestInterceptor NO_OP = o -> o;
+}
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/RequestInterceptorHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/RequestInterceptorHandler.java
new file mode 100644
index 0000000..3c38927
--- /dev/null
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/RequestInterceptorHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
+import org.apache.tinkerpop.gremlin.driver.RequestInterceptor;
+
+/**
+ * A handler in the {@link Channelizer} that will intercept outgoing requests so that the user might mutate them
+ * in some way before they are sent.
+ */
+public class RequestInterceptorHandler extends ChannelOutboundHandlerAdapter {
+
+    private final RequestInterceptor interceptor;
+
+    public RequestInterceptorHandler(final RequestInterceptor interceptor) {
+        this.interceptor = interceptor;
+    }
+
+    @Override
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
+        super.write(ctx, interceptor.apply(msg), promise);
+    }
+
+}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index f256e51..19c6e41 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.log4j.Level;
 import org.apache.tinkerpop.gremlin.TestHelper;
@@ -212,6 +215,40 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration
     }
 
     @Test
+    public void shouldInterceptRequests() throws Exception {
+        final int requestsToMake = 32;
+        final AtomicInteger submitRequests = new AtomicInteger(0);
+        final AtomicInteger websocketHandshakeRequests = new AtomicInteger(0);
+        final AtomicInteger websocketCloseRequests = new AtomicInteger(0);
+
+        final Cluster cluster = TestClientFactory.build().
+                minConnectionPoolSize(1).maxConnectionPoolSize(1).requestInterceptor(r -> {
+            if (r instanceof FullHttpRequest)
+                websocketHandshakeRequests.incrementAndGet();
+            else if (r instanceof CloseWebSocketFrame)
+                websocketCloseRequests.incrementAndGet();
+            else if (r instanceof WebSocketFrame)
+                submitRequests.incrementAndGet();
+            else
+                throw new RuntimeException(String.format("Received an unexpected message type: %s", r.getClass().getSimpleName()));
+            return r;
+        }).create();
+
+        try {
+            final Client client = cluster.connect();
+            for (int ix = 0; ix < requestsToMake; ix++) {
+                assertEquals(ix + 1, client.submit(ix + "+1").all().get().get(0).getInt());
+            }
+        } finally {
+            cluster.close();
+        }
+
+        assertEquals(requestsToMake, submitRequests.get());
+        assertEquals(1, websocketHandshakeRequests.get());
+        assertEquals(1, websocketCloseRequests.get());
+    }
+
+    @Test
     public void shouldReportErrorWhenRequestCantBeSerialized() throws Exception {
         final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
         try {