You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/05/14 17:02:57 UTC

[tinkerpop] branch travis-fix updated: Hold the messages until close() for a session completes and then flush CTR

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch travis-fix
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/travis-fix by this push:
     new 301967f  Hold the messages until close() for a session completes and then flush CTR
301967f is described below

commit 301967fbdadd62d02913551932d399df6d127f49
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Fri May 14 13:02:18 2021 -0400

    Hold the messages until close() for a session completes and then flush CTR
---
 .../apache/tinkerpop/gremlin/server/Context.java   | 27 ++++++++++++++++---
 .../gremlin/server/handler/MultiTaskSession.java   | 13 +++++++--
 .../tinkerpop/gremlin/server/ContextTest.java      | 31 +++++++++++++++-------
 3 files changed, 55 insertions(+), 16 deletions(-)

diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 0cdb104..8c2ee6a 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -174,19 +174,38 @@ public class Context {
      * @see #writeAndFlush(ResponseMessage)
      */
     public void writeAndFlush(final ResponseStatusCode code, final Object responseMessage) {
+        writeAndMaybeFlush(code, responseMessage, true);
+    }
+
+    public void write(final ResponseMessage message) {
+        this.write(message.getStatus().getCode(), message);
+    }
+
+    public void write(final ResponseStatusCode code, final Object responseMessage) {
+        writeAndMaybeFlush(code, responseMessage, false);
+    }
+
+    /**
+     * Flushes messages to the underlying transport.
+     */
+    public void flush() {
+        this.getChannelHandlerContext().flush();
+    }
+
+    private void writeAndMaybeFlush(final ResponseStatusCode code, final Object responseMessage, final boolean flush) {
         final boolean messageIsFinal = code.isFinalResponse();
-        if(finalResponseWritten.compareAndSet(false, messageIsFinal)) {
-            this.getChannelHandlerContext().writeAndFlush(responseMessage);
+        if (finalResponseWritten.compareAndSet(false, messageIsFinal)) {
+            this.getChannelHandlerContext().write(responseMessage);
+            if (flush) this.getChannelHandlerContext().flush();
         } else {
             if (responseMessage instanceof Frame) {
                 ((Frame) responseMessage).tryRelease();
             }
 
             final String logMessage = String.format("Another final response message was already written for request %s, ignoring response code: %s",
-                                                    this.getRequestMessage().getRequestId(), code);
+                    this.getRequestMessage().getRequestId(), code);
             logger.warn(logMessage);
         }
-
     }
 
     private RequestContentType determineRequestContents() {
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
index 8765d98..afdae15 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
@@ -187,7 +187,7 @@ public class MultiTaskSession extends AbstractSession {
             // the ResponseMessage as we don't want the message to be "final" for the Context. that
             // status must be reserved for the message that caused the error
             for (SessionTask st : queue) {
-                st.writeAndFlush(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(st.getRequestMessage())
+                st.write(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(st.getRequestMessage())
                         .code(ResponseStatusCode.SERVER_ERROR)
                         .statusMessage(String.format(
                                 "An earlier request [%s] failed prior to this one having a chance to execute",
@@ -205,7 +205,7 @@ public class MultiTaskSession extends AbstractSession {
             // the best answer
             if (!sessionTask.isFinalResponseWritten()) {
                 logger.warn(rexex.getMessage(), rexex);
-                sessionTask.writeAndFlush(rexex.getResponseMessage());
+                sessionTask.write(rexex.getResponseMessage());
             }
         } finally {
             // if this is a normal end to the session or if there is some general processing exception which tanks
@@ -215,6 +215,15 @@ public class MultiTaskSession extends AbstractSession {
                     closeReason.get() == CloseReason.PROCESSING_EXCEPTION ||
                     closeReason.get() == CloseReason.SESSION_TIMEOUT) {
                 close();
+
+                // the session is now in a state where it is no longer in the set of current sessions so flush
+                // remaining messages to the transport, if any. the remaining message should be failures from
+                // the SessionException in the catch. this prevents an unlikely case where a fast client can
+                // get ahead of the server and start to send messages to a technically errored out session. that
+                // in itself is not necessarily bad but it makes tests fail sometimes because the tests are
+                // designed to assert in the same fashion for OpProcessor and UnifiedChannelizer infrastructure
+                // and they are slightly at odds with each other in certain situations.
+                sessionTask.flush();
             }
         }
     }
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
index e685932..c7ff19f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
@@ -104,48 +104,57 @@ public class ContextTest {
     @Test
     public void shouldAllowMultipleNonFinalResponses() {
         writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
-        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).flush();
 
         writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
-        Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(2)).flush();
 
         writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
-        Mockito.verify(ctx, Mockito.times(3)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(3)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(3)).flush();
     }
 
     @Test
     public void shouldAllowAtMostOneFinalResponse() {
         writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
-        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).flush();
 
         writeInvoker.apply(context, ResponseStatusCode.SUCCESS);
-        Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(2)).flush();
 
         writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
         assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
         assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SERVER_ERROR_TIMEOUT + "$"));
 
         // ensure there were no other writes to the channel
-        Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(2)).flush();
     }
 
     @Test
     public void shouldNotAllowNonFinalMessagesAfterFinalResponse() {
         writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
-        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).flush();
 
         writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
         assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
         assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.PARTIAL_CONTENT + "$"));
 
         // ensure there were no other writes to the channel
-        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).flush();
     }
 
     @Test
     public void shouldReleaseIgnoredFrames() {
         writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
-        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).flush();
 
         Frame frame = Mockito.mock(Frame.class);
         context.writeAndFlush(ResponseStatusCode.SUCCESS, frame);
@@ -154,9 +163,11 @@ public class ContextTest {
         assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SUCCESS + "$"));
 
         // ensure there were no other writes to the channel
-        Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+        Mockito.verify(ctx, Mockito.times(1)).flush();
 
         // ensure the frame was released
         Mockito.verify(frame, Mockito.times(1)).tryRelease();
+        Mockito.verify(ctx, Mockito.times(1)).flush();
     }
 }
\ No newline at end of file