You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/06/04 07:27:41 UTC

[ignite-3] branch main updated: IGNITE-14824 Support for partial message headers in DirectMessageWriter. Fixes #164

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dacf26e  IGNITE-14824 Support for partial message headers in DirectMessageWriter. Fixes #164
dacf26e is described below

commit dacf26e9433cafcb95dda9b48ec0dcd04ec4237a
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Jun 4 10:25:31 2021 +0300

    IGNITE-14824 Support for partial message headers in DirectMessageWriter. Fixes #164
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../internal/direct/DirectMessageWriter.java       | 23 ++++++++++++++--
 .../stream/DirectByteBufferStreamImplV1.java       | 32 ++++++++++------------
 2 files changed, 34 insertions(+), 21 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
index 0a5d33d..7c500f8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
@@ -56,11 +56,20 @@ public class DirectMessageWriter implements MessageWriter {
     }
 
     /** {@inheritDoc} */
+    // TODO: compress the header https://issues.apache.org/jira/browse/IGNITE-14818
     @Override public boolean writeHeader(short groupType, short messageType, byte fieldCnt) {
         DirectByteBufferStream stream = state.item().stream;
 
-        // TODO: compress these values https://issues.apache.org/jira/browse/IGNITE-14818
-        stream.writeShort(groupType);
+        // first part of the header might have already been sent in a previous write attempt
+        if (!state.item().partialHdrWritten) {
+            stream.writeShort(groupType);
+
+            if (stream.lastFinished())
+                state.item().partialHdrWritten = true;
+            else
+                return false;
+        }
+
         stream.writeShort(messageType);
 
         return stream.lastFinished();
@@ -345,7 +354,14 @@ public class DirectMessageWriter implements MessageWriter {
         /** */
         private int state;
 
-        /** */
+        /**
+         * Flag indicating that the first part of the message header has been written.
+         */
+        private boolean partialHdrWritten;
+
+        /**
+         * Flag indicating that the whole message header has been written.
+         */
         private boolean hdrWritten;
 
         /**
@@ -367,6 +383,7 @@ public class DirectMessageWriter implements MessageWriter {
         /** {@inheritDoc} */
         @Override public void reset() {
             state = 0;
+            partialHdrWritten = false;
             hdrWritten = false;
         }
     }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
index 3908d30..4f30923 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
@@ -1054,18 +1054,15 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
                 if (!lastFinished)
                     return null;
 
+                // message group type will be equal to Short.MIN_VALUE if a nested message is null
+                if (msgGroupType == Short.MIN_VALUE)
+                    // lastFinished is "true" here, so no further parsing will be required
+                    return null;
+
                 // save current progress, because we can read the header in two chunks
                 msgGroupTypeRead = true;
             }
 
-            // message group type will be equal to Short.MIN_VALUE if a nested message is null
-            if (msgGroupType == Short.MIN_VALUE) {
-                lastFinished = true;
-                msgGroupTypeRead = false;
-
-                return null;
-            }
-
             // read the message type
             short msgType = readShort();
 
@@ -1077,18 +1074,17 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
         // if the deserializer is not null then we have definitely finished parsing the header and can read the message
         // body
-        if (msgDeserializer != null) {
-            try {
-                reader.beforeInnerMessageRead();
+        reader.beforeInnerMessageRead();
 
-                reader.setCurrentReadClass(msgDeserializer.klass());
+        try {
+            reader.setCurrentReadClass(msgDeserializer.klass());
 
-                reader.setBuffer(buf);
-                lastFinished = msgDeserializer.readMessage(reader);
-            }
-            finally {
-                reader.afterInnerMessageRead(lastFinished);
-            }
+            reader.setBuffer(buf);
+
+            lastFinished = msgDeserializer.readMessage(reader);
+        }
+        finally {
+            reader.afterInnerMessageRead(lastFinished);
         }
 
         if (lastFinished) {