You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2021/05/14 17:02:57 UTC
[tinkerpop] branch travis-fix updated: Hold the messages until
close() for a session completes and then flush CTR
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch travis-fix
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/travis-fix by this push:
new 301967f Hold the messages until close() for a session completes and then flush CTR
301967f is described below
commit 301967fbdadd62d02913551932d399df6d127f49
Author: Stephen Mallette <st...@amazon.com>
AuthorDate: Fri May 14 13:02:18 2021 -0400
Hold the messages until close() for a session completes and then flush CTR
---
.../apache/tinkerpop/gremlin/server/Context.java | 27 ++++++++++++++++---
.../gremlin/server/handler/MultiTaskSession.java | 13 +++++++--
.../tinkerpop/gremlin/server/ContextTest.java | 31 +++++++++++++++-------
3 files changed, 55 insertions(+), 16 deletions(-)
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 0cdb104..8c2ee6a 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -174,19 +174,38 @@ public class Context {
* @see #writeAndFlush(ResponseMessage)
*/
public void writeAndFlush(final ResponseStatusCode code, final Object responseMessage) {
+ writeAndMaybeFlush(code, responseMessage, true);
+ }
+
+ public void write(final ResponseMessage message) {
+ this.write(message.getStatus().getCode(), message);
+ }
+
+ public void write(final ResponseStatusCode code, final Object responseMessage) {
+ writeAndMaybeFlush(code, responseMessage, false);
+ }
+
+ /**
+ * Flushes messages to the underlying transport.
+ */
+ public void flush() {
+ this.getChannelHandlerContext().flush();
+ }
+
+ private void writeAndMaybeFlush(final ResponseStatusCode code, final Object responseMessage, final boolean flush) {
final boolean messageIsFinal = code.isFinalResponse();
- if(finalResponseWritten.compareAndSet(false, messageIsFinal)) {
- this.getChannelHandlerContext().writeAndFlush(responseMessage);
+ if (finalResponseWritten.compareAndSet(false, messageIsFinal)) {
+ this.getChannelHandlerContext().write(responseMessage);
+ if (flush) this.getChannelHandlerContext().flush();
} else {
if (responseMessage instanceof Frame) {
((Frame) responseMessage).tryRelease();
}
final String logMessage = String.format("Another final response message was already written for request %s, ignoring response code: %s",
- this.getRequestMessage().getRequestId(), code);
+ this.getRequestMessage().getRequestId(), code);
logger.warn(logMessage);
}
-
}
private RequestContentType determineRequestContents() {
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
index 8765d98..afdae15 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
@@ -187,7 +187,7 @@ public class MultiTaskSession extends AbstractSession {
// the ResponseMessage as we don't want the message to be "final" for the Context. that
// status must be reserved for the message that caused the error
for (SessionTask st : queue) {
- st.writeAndFlush(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(st.getRequestMessage())
+ st.write(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(st.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(String.format(
"An earlier request [%s] failed prior to this one having a chance to execute",
@@ -205,7 +205,7 @@ public class MultiTaskSession extends AbstractSession {
// the best answer
if (!sessionTask.isFinalResponseWritten()) {
logger.warn(rexex.getMessage(), rexex);
- sessionTask.writeAndFlush(rexex.getResponseMessage());
+ sessionTask.write(rexex.getResponseMessage());
}
} finally {
// if this is a normal end to the session or if there is some general processing exception which tanks
@@ -215,6 +215,15 @@ public class MultiTaskSession extends AbstractSession {
closeReason.get() == CloseReason.PROCESSING_EXCEPTION ||
closeReason.get() == CloseReason.SESSION_TIMEOUT) {
close();
+
+ // the session is now in a state where it is no longer in the set of current sessions so flush
+ // remaining messages to the transport, if any. the remaining message should be failures from
+ // the SessionException in the catch. this prevents an unlikely case where a fast client can
+ // get ahead of the server and start to send messages to a technically errored out session. that
+ // in itself is not necessarily bad but it makes tests fail sometimes because the tests are
+ // designed to assert in the same fashion for OpProcessor and UnifiedChannelizer infrastructure
+ // and they are slightly at odds with each other in certain situations.
+ sessionTask.flush();
}
}
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
index e685932..c7ff19f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
@@ -104,48 +104,57 @@ public class ContextTest {
@Test
public void shouldAllowMultipleNonFinalResponses() {
writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
- Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).flush();
writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
- Mockito.verify(ctx, Mockito.times(3)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(3)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(3)).flush();
}
@Test
public void shouldAllowAtMostOneFinalResponse() {
writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
writeInvoker.apply(context, ResponseStatusCode.SUCCESS);
- Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).flush();
writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SERVER_ERROR_TIMEOUT + "$"));
// ensure there were no other writes to the channel
- Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).flush();
}
@Test
public void shouldNotAllowNonFinalMessagesAfterFinalResponse() {
writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.PARTIAL_CONTENT + "$"));
// ensure there were no other writes to the channel
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
}
@Test
public void shouldReleaseIgnoredFrames() {
writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
Frame frame = Mockito.mock(Frame.class);
context.writeAndFlush(ResponseStatusCode.SUCCESS, frame);
@@ -154,9 +163,11 @@ public class ContextTest {
assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SUCCESS + "$"));
// ensure there were no other writes to the channel
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
// ensure the frame was released
Mockito.verify(frame, Mockito.times(1)).tryRelease();
+ Mockito.verify(ctx, Mockito.times(1)).flush();
}
}
\ No newline at end of file