You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/02/11 19:42:17 UTC

[qpid-protonj2] branch main updated: PROTON-2499 Cleanup stream sender dispatch path and seal client message

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 51287bf  PROTON-2499 Cleanup stream sender dispatch path and seal client message
51287bf is described below

commit 51287bf7de2f81528104cc19f53c64823918f5db
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Fri Feb 11 14:41:49 2022 -0500

    PROTON-2499 Cleanup stream sender dispatch path and seal client message
    
    Reduces overhead of each stream sender message and simplifies the
    dispatch code path.  Allows ClientMessage to be final as it was intended
    to be.
---
 .../qpid/protonj2/client/impl/ClientMessage.java       |  2 +-
 .../qpid/protonj2/client/impl/ClientStreamSender.java  |  7 +++----
 .../client/impl/ClientStreamSenderMessage.java         | 18 ++----------------
 3 files changed, 6 insertions(+), 21 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessage.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessage.java
index 35b336e..6f5df4d 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessage.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessage.java
@@ -48,7 +48,7 @@ import org.apache.qpid.protonj2.types.messaging.Section.SectionType;
  *
  * @param <E> the body type that the {@link Message} carries
  */
-public class ClientMessage<E> implements AdvancedMessage<E> {
+public final class ClientMessage<E> implements AdvancedMessage<E> {
 
     private Header header;
     private MessageAnnotations messageAnnotations;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 100a903..abc2127 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -19,7 +19,6 @@ package org.apache.qpid.protonj2.client.impl;
 import java.util.Map;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
-import org.apache.qpid.protonj2.client.AdvancedMessage;
 import org.apache.qpid.protonj2.client.Message;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamSenderOptions;
@@ -117,11 +116,11 @@ public final class ClientStreamSender extends ClientSender implements StreamSend
         return (ClientStreamSender) super.open();
     }
 
-    StreamTracker sendMessage(ClientStreamSenderMessage context, AdvancedMessage<?> message) throws ClientException {
+    StreamTracker sendMessage(ClientStreamSenderMessage context, ProtonBuffer payload, int messageFormat) throws ClientException {
         final ClientFuture<Tracker> operation = session.getFutureFactory().createFuture();
-        final ProtonBuffer buffer = message.encode(null);
+        final ProtonBuffer buffer = payload;
         final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(
-            this, context.getProtonDelivery(), message.messageFormat(), buffer, context.completed(), operation);
+            this, context.getProtonDelivery(), messageFormat, buffer, context.completed(), operation);
 
         executor.execute(() -> {
             if (notClosedOrFailed(operation)) {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java
index 8f81e3a..0778755 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java
@@ -77,7 +77,6 @@ final class ClientStreamSenderMessage implements StreamSenderMessage {
     private final ClientStreamSender sender;
     private final DeliveryAnnotations deliveryAnnotations;
     private final int writeBufferSize;
-    private final StreamMessagePacket streamMessagePacket = new StreamMessagePacket();
     private final ClientStreamTracker tracker;
 
     private Header header;
@@ -136,7 +135,7 @@ final class ClientStreamSenderMessage implements StreamSenderMessage {
     private void doFlush() throws ClientException {
         if (buffer != null && buffer.isReadable()) {
             try {
-                sender.sendMessage(this, streamMessagePacket);
+                sender.sendMessage(this, buffer, messageFormat);
             } finally {
                 buffer = null;
             }
@@ -939,26 +938,13 @@ final class ClientStreamSenderMessage implements StreamSenderMessage {
 
         if (buffer.getReadableBytes() >= writeBufferSize) {
             try {
-                sender.sendMessage(this, streamMessagePacket);
+                sender.sendMessage(this, buffer, messageFormat);
             } finally {
                 buffer = null;
             }
         }
     }
 
-    private final class StreamMessagePacket extends ClientMessage<byte[]> {
-
-        @Override
-        public int messageFormat() {
-            return messageFormat;
-        }
-
-        @Override
-        public ProtonBuffer encode(Map<String, Object> deliveryAnnotations) {
-            return buffer;
-        }
-    }
-
     private void transitionToWritableState() throws ClientException {
         if (currentState == StreamState.PREAMBLE) {
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org