You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/07/25 11:13:34 UTC
[tinkerpop] 02/02: TINKERPOP-2246 Added missing files
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch TINKERPOP-2246
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit b035a72c0e4c97abff16ecee61c2e94096a1a612
Author: Stephen Mallette <sp...@genoprime.com>
AuthorDate: Thu Jul 25 07:12:54 2019 -0400
TINKERPOP-2246 Added missing files
---
.../gremlin/server/ResponseHandlerContext.java | 92 ++++++++++++
.../gremlin/server/ResponseHandlerContextTest.java | 161 +++++++++++++++++++++
2 files changed, 253 insertions(+)
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
new file mode 100644
index 0000000..d927fd8
--- /dev/null
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContext.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.server.handler.Frame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A context for asynchronously writing response messages related to a particular request.
+ * <p>The "write" methods of this class ensure that at most one {@link ResponseStatusCode#isFinalResponse() final}
+ * response message is written to the underlying channel. Attempts to write more than one final response message will
+ * be ignored with a warning log message.</p>
+ * <p>Note: an object of this class should be used instead of writing to the channel directly when multiple threads
+ * are expected to produce final response messages concurrently. Callers must ensure that the same
+ * {@link ResponseHandlerContext} is used by all threads writing response messages for the same request.</p>
+ *
+ * @author Dmitri Bourlatchkov
+ * @deprecated As of release 3.3.8, replaced by {@link Context}.
+ */
+@Deprecated
+public class ResponseHandlerContext {
+ private static final Logger logger = LoggerFactory.getLogger(ResponseHandlerContext.class);
+
+ private final Context context;
+ private final AtomicBoolean finalResponseWritten = new AtomicBoolean();
+
+ public ResponseHandlerContext(final Context context) {
+ this.context = context;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ /**
+ * Writes a response message to the underlying channel while ensuring that at most one
+ * {@link ResponseStatusCode#isFinalResponse() final} response is written.
+ * <p>Note: this method should be used instead of writing to the channel directly when multiple threads
+ * are expected to produce response messages concurrently.</p>
+ * <p>Attempts to write more than one final response message will be ignored.</p>
+ * @see #writeAndFlush(ResponseStatusCode, Object)
+ */
+ public void writeAndFlush(final ResponseMessage message) {
+ writeAndFlush(message.getStatus().getCode(), message);
+ }
+
+ /**
+ * Writes a response message to the underlying channel while ensuring that at most one
+ * {@link ResponseStatusCode#isFinalResponse() final} response is written.
+ * <p>The caller must make sure that the provided response status code matches the content of the message.</p>
+ * <p>Note: this method should be used instead of writing to the channel directly when multiple threads
+ * are expected to produce response messages concurrently.</p>
+ * <p>Attempts to write more than one final response message will be ignored.</p>
+ * @see #writeAndFlush(ResponseMessage)
+ */
+ public void writeAndFlush(final ResponseStatusCode code, final Object responseMessage) {
+ final boolean messageIsFinal = code.isFinalResponse();
+ if(finalResponseWritten.compareAndSet(false, messageIsFinal)) {
+ context.getChannelHandlerContext().writeAndFlush(responseMessage);
+ } else {
+ if (responseMessage instanceof Frame) {
+ ((Frame) responseMessage).tryRelease();
+ }
+
+ final String logMessage = String.format("Another final response message was already written for request %s, ignoring response code: %s",
+ context.getRequestMessage().getRequestId(), code);
+ logger.warn(logMessage);
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
new file mode 100644
index 0000000..a64971a
--- /dev/null
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ResponseHandlerContextTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.server.handler.Frame;
+import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class ResponseHandlerContextTest {
+
+ @Parameterized.Parameter(value = 0)
+ public BiFunction<Context, ResponseStatusCode, Void> writeInvoker;
+
+ private final ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class);
+ private final RequestMessage request = RequestMessage.build("test").create();
+ private final Context context = new Context(request, ctx, null, null, null, null);
+ private final Log4jRecordingAppender recordingAppender = new Log4jRecordingAppender();
+
+ private Level originalLogLevel;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {
+ new BiFunction<Context, ResponseStatusCode, Void>() {
+ @Override
+ public Void apply(final Context context, final ResponseStatusCode code) {
+ context.writeAndFlush(code, "testMessage");
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "writeAndFlush(ResponseStatusCode, Object)";
+ }
+ }
+ }, {
+ new BiFunction<Context, ResponseStatusCode, Void>() {
+ @Override
+ public Void apply(final Context context, final ResponseStatusCode code) {
+ context.writeAndFlush(ResponseMessage.build(UUID.randomUUID()).code(code).create());
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "writeAndFlush(ResponseMessage)";
+ }
+ }
+ },
+ });
+ }
+
+ @Before
+ public void addRecordingAppender() {
+ final Logger rootLogger = Logger.getRootLogger();
+ rootLogger.addAppender(recordingAppender);
+ originalLogLevel = rootLogger.getLevel();
+ rootLogger.setLevel(Level.ALL);
+ }
+
+ @After
+ public void removeRecordingAppender() {
+ final Logger rootLogger = Logger.getRootLogger();
+ rootLogger.setLevel(originalLogLevel);
+ rootLogger.removeAppender(recordingAppender);
+ }
+
+ @Test
+ public void shouldAllowMultipleNonFinalResponses() {
+ writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
+ Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+ writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
+ Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+
+ writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
+ Mockito.verify(ctx, Mockito.times(3)).writeAndFlush(Mockito.any());
+ }
+
+ @Test
+ public void shouldAllowAtMostOneFinalResponse() {
+ writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
+ Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+ writeInvoker.apply(context, ResponseStatusCode.SUCCESS);
+ Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+
+ writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+ assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
+ assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SERVER_ERROR_TIMEOUT + "$"));
+
+ // ensure there were no other writes to the channel
+ Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+ }
+
+ @Test
+ public void shouldNotAllowNonFinalMessagesAfterFinalResponse() {
+ writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+ Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+ writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
+ assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
+ assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.PARTIAL_CONTENT + "$"));
+
+ // ensure there were no other writes to the channel
+ Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ }
+
+ @Test
+ public void shouldReleaseIgnoredFrames() {
+ writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
+ Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+ Frame frame = Mockito.mock(Frame.class);
+ context.writeAndFlush(ResponseStatusCode.SUCCESS, frame);
+
+ assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
+ assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SUCCESS + "$"));
+
+ // ensure there were no other writes to the channel
+ Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+
+ // ensure the frame was released
+ Mockito.verify(frame, Mockito.times(1)).tryRelease();
+ }
+}
\ No newline at end of file