You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/06/04 07:27:41 UTC
[ignite-3] branch main updated: IGNITE-14824 Support for partial
message headers in DirectMessageWriter. Fixes #164
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dacf26e IGNITE-14824 Support for partial message headers in DirectMessageWriter. Fixes #164
dacf26e is described below
commit dacf26e9433cafcb95dda9b48ec0dcd04ec4237a
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Jun 4 10:25:31 2021 +0300
IGNITE-14824 Support for partial message headers in DirectMessageWriter. Fixes #164
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../internal/direct/DirectMessageWriter.java | 23 ++++++++++++++--
.../stream/DirectByteBufferStreamImplV1.java | 32 ++++++++++------------
2 files changed, 34 insertions(+), 21 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
index 0a5d33d..7c500f8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
@@ -56,11 +56,20 @@ public class DirectMessageWriter implements MessageWriter {
}
/** {@inheritDoc} */
+ // TODO: compress the header https://issues.apache.org/jira/browse/IGNITE-14818
@Override public boolean writeHeader(short groupType, short messageType, byte fieldCnt) {
DirectByteBufferStream stream = state.item().stream;
- // TODO: compress these values https://issues.apache.org/jira/browse/IGNITE-14818
- stream.writeShort(groupType);
+ // first part of the header might have already been sent in a previous write attempt
+ if (!state.item().partialHdrWritten) {
+ stream.writeShort(groupType);
+
+ if (stream.lastFinished())
+ state.item().partialHdrWritten = true;
+ else
+ return false;
+ }
+
stream.writeShort(messageType);
return stream.lastFinished();
@@ -345,7 +354,14 @@ public class DirectMessageWriter implements MessageWriter {
/** */
private int state;
- /** */
+ /**
+ * Flag indicating that the first part of the message header has been written.
+ */
+ private boolean partialHdrWritten;
+
+ /**
+ * Flag indicating that the whole message header has been written.
+ */
private boolean hdrWritten;
/**
@@ -367,6 +383,7 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public void reset() {
state = 0;
+ partialHdrWritten = false;
hdrWritten = false;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
index 3908d30..4f30923 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
@@ -1054,18 +1054,15 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
if (!lastFinished)
return null;
+ // message group type will be equal to Short.MIN_VALUE if a nested message is null
+ if (msgGroupType == Short.MIN_VALUE)
+ // lastFinished is "true" here, so no further parsing will be required
+ return null;
+
// save current progress, because we can read the header in two chunks
msgGroupTypeRead = true;
}
- // message group type will be equal to Short.MIN_VALUE if a nested message is null
- if (msgGroupType == Short.MIN_VALUE) {
- lastFinished = true;
- msgGroupTypeRead = false;
-
- return null;
- }
-
// read the message type
short msgType = readShort();
@@ -1077,18 +1074,17 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
// if the deserializer is not null then we have definitely finished parsing the header and can read the message
// body
- if (msgDeserializer != null) {
- try {
- reader.beforeInnerMessageRead();
+ reader.beforeInnerMessageRead();
- reader.setCurrentReadClass(msgDeserializer.klass());
+ try {
+ reader.setCurrentReadClass(msgDeserializer.klass());
- reader.setBuffer(buf);
- lastFinished = msgDeserializer.readMessage(reader);
- }
- finally {
- reader.afterInnerMessageRead(lastFinished);
- }
+ reader.setBuffer(buf);
+
+ lastFinished = msgDeserializer.readMessage(reader);
+ }
+ finally {
+ reader.afterInnerMessageRead(lastFinished);
}
if (lastFinished) {