You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/07/25 11:13:34 UTC

[tinkerpop] 02/02: TINKERPOP-2246 Added missing files

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2246
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit b035a72c0e4c97abff16ecee61c2e94096a1a612
Author: Stephen Mallette <sp...@genoprime.com>
AuthorDate: Thu Jul 25 07:12:54 2019 -0400

    TINKERPOP-2246 Added missing files
---
 .../gremlin/server/ResponseHandlerContext.java     |  92 ++++++++++++
 .../gremlin/server/ResponseHandlerContextTest.java | 161 +++++++++++++++++++++
 2 files changed, 253 insertions(+)

diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
new file mode 100644
index 0000000..d927fd8
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.server.handler.Frame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A context for asynchronously writing response messages related to a particular request.
+ * <p>The "write" methods of this class ensure that at most one {@link ResponseStatusCode#isFinalResponse() final}
+ * response message is written to the underlying channel. Attempts to write more than one final response message will
+ * be ignored with a warning log message.</p>
+ * <p>Note: an object of this class should be used instead of writing to the channel directly when multiple threads
+ * are expected to produce final response messages concurrently. Callers must ensure that the same
+ * {@link ResponseHandlerContext} is used by all threads writing response messages for the same request.</p>
+ *
+ * @author Dmitri Bourlatchkov
+ * @deprecated As of release 3.3.8, replaced by {@link Context}.
+ */
+@Deprecated
+public class ResponseHandlerContext {
+    private static final Logger logger = LoggerFactory.getLogger(ResponseHandlerContext.class);
+
+    private final Context context;
+    private final AtomicBoolean finalResponseWritten = new AtomicBoolean();
+
+    public ResponseHandlerContext(final Context context) {
+        this.context = context;
+    }
+
+    public Context getContext() {
+        return context;
+    }
+
+    /**
+     * Writes a response message to the underlying channel while ensuring that at most one
+     * {@link ResponseStatusCode#isFinalResponse() final} response is written.
+     * <p>Note: this method should be used instead of writing to the channel directly when multiple threads
+     * are expected to produce response messages concurrently.</p>
+     * <p>Attempts to write more than one final response message will be ignored.</p>
+     * @see #writeAndFlush(ResponseStatusCode, Object)
+     */
+    public void writeAndFlush(final ResponseMessage message) {
+        writeAndFlush(message.getStatus().getCode(), message);
+    }
+
+    /**
+     * Writes a response message to the underlying channel while ensuring that at most one
+     * {@link ResponseStatusCode#isFinalResponse() final} response is written.
+     * <p>The caller must make sure that the provided response status code matches the content of the message.</p>
+     * <p>Note: this method should be used instead of writing to the channel directly when multiple threads
+     * are expected to produce response messages concurrently.</p>
+     * <p>Attempts to write more than one final response message will be ignored.</p>
+     * @see #writeAndFlush(ResponseMessage)
+     */
+    public void writeAndFlush(final ResponseStatusCode code, final Object responseMessage) {
+        final boolean messageIsFinal = code.isFinalResponse();
+        if(finalResponseWritten.compareAndSet(false, messageIsFinal)) {
+            context.getChannelHandlerContext().writeAndFlush(responseMessage);
+        } else {
+            if (responseMessage instanceof Frame) {
+                ((Frame) responseMessage).tryRelease();
+            }
+
+            final String logMessage = String.format("Another final response message was already written for request %s, ignoring response code: %s",
+                    context.getRequestMessage().getRequestId(), code);
+            logger.warn(logMessage);
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
new file mode 100644
index 0000000..a64971a
--- /dev/null
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.server.handler.Frame;
+import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class ResponseHandlerContextTest {
+
+    @Parameterized.Parameter(value = 0)
+    public BiFunction<Context, ResponseStatusCode, Void> writeInvoker;
+
+    private final ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
+    private final RequestMessage request = RequestMessage.build("test").create();
+    private final Context context = new Context(request, ctx, null, null, null, null);
+    private final Log4jRecordingAppender recordingAppender = new Log4jRecordingAppender();
+
+    private Level originalLogLevel;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {
+                        new BiFunction<Context, ResponseStatusCode, Void>() {
+                            @Override
+                            public Void apply(final Context context, final ResponseStatusCode code) {
+                                context.writeAndFlush(code, "testMessage");
+                                return null;
+                            }
+
+                            @Override
+                            public String toString() {
+                                return "writeAndFlush(ResponseStatusCode, Object)";
+                            }
+                        }
+                }, {
+                new BiFunction<Context, ResponseStatusCode, Void>() {
+                    @Override
+                    public Void apply(final Context context, final ResponseStatusCode code) {
+                        context.writeAndFlush(ResponseMessage.build(UUID.randomUUID()).code(code).create());
+                        return null;
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "writeAndFlush(ResponseMessage)";
+                    }
+                }
+        },
+        });
+    }
+
+    @Before
+    public void addRecordingAppender() {
+        final Logger rootLogger = Logger.getRootLogger();
+        rootLogger.addAppender(recordingAppender);
+        originalLogLevel = rootLogger.getLevel();
+        rootLogger.setLevel(Level.ALL);
+    }
+
+    @After
+    public void removeRecordingAppender() {
+        final Logger rootLogger = Logger.getRootLogger();
+        rootLogger.setLevel(originalLogLevel);
+        rootLogger.removeAppender(recordingAppender);
+    }
+
+    @Test
+    public void shouldAllowMultipleNonFinalResponses() {
+        writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+        writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
+        Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+
+        writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
+        Mockito.verify(ctx, Mockito.times(3)).writeAndFlush(Mockito.any());
+    }
+
+    @Test
+    public void shouldAllowAtMostOneFinalResponse() {
+        writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+        writeInvoker.apply(context, ResponseStatusCode.SUCCESS);
+        Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+
+        writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+        assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
+        assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SERVER_ERROR_TIMEOUT + "$"));
+
+        // ensure there were no other writes to the channel
+        Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+    }
+
+    @Test
+    public void shouldNotAllowNonFinalMessagesAfterFinalResponse() {
+        writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+        writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
+        assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
+        assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.PARTIAL_CONTENT + "$"));
+
+        // ensure there were no other writes to the channel
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+    }
+
+    @Test
+    public void shouldReleaseIgnoredFrames() {
+        writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+        Frame frame = Mockito.mock(Frame.class);
+        context.writeAndFlush(ResponseStatusCode.SUCCESS, frame);
+
+        assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
+        assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SUCCESS + "$"));
+
+        // ensure there were no other writes to the channel
+        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+        // ensure the frame was released
+        Mockito.verify(frame, Mockito.times(1)).tryRelease();
+    }
+}
\ No newline at end of file