You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/06/27 22:23:52 UTC

[2/2] activemq-artemis git commit: ARTEMIS-569 add missing method

ARTEMIS-569 add missing method


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e9733a62
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e9733a62
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e9733a62

Branch: refs/heads/master
Commit: e9733a6223601db306d719a8e2b456e6c0494feb
Parents: d73d6d2
Author: jbertram <jb...@apache.org>
Authored: Mon Jun 27 16:41:59 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jun 27 18:23:45 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientProducerImpl.java        |  6 +++---
 .../protocol/core/impl/ActiveMQSessionContext.java  | 16 ++++++++++++++++
 .../artemis/spi/core/remoting/SessionContext.java   |  7 +++++++
 3 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9733a62/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index 99e593f..b0998b8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -371,10 +371,10 @@ public class ClientProducerImpl implements ClientProducerInternal {
       context.open();
       try {
 
-         for (int pos = 0; pos < bodySize; ) {
+         for (long pos = 0; pos < bodySize; ) {
             final boolean lastChunk;
 
-            final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize);
+            final int chunkLength = (int) Math.min((bodySize - pos), (long) minLargeMessageSize);
 
             final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength);
 
@@ -385,7 +385,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
             lastChunk = pos >= bodySize;
             SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
 
-            int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, messageHandler);
+            int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
 
             credits.acquireCredits(creditsUsed);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9733a62/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 9f0edce..f49a22a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -438,6 +438,22 @@ public class ActiveMQSessionContext extends SessionContext {
    }
 
    @Override
+   public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException {
+      final boolean requiresResponse = lastChunk && sendBlocking;
+      final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+
+      if (requiresResponse) {
+         // When sending it blocking, only the last chunk will be blocking.
+         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+      }
+      else {
+         sessionChannel.send(chunkPacket);
+      }
+
+      return chunkPacket.getPacketSize();
+   }
+
+   @Override
    public void sendACK(boolean individual,
                        boolean block,
                        final ClientConsumer consumer,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9733a62/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 774dbfe..175360c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -150,6 +150,13 @@ public abstract class SessionContext {
                                              int reconnectID,
                                              SendAcknowledgementHandler messageHandler) throws ActiveMQException;
 
+   public abstract int sendServerLargeMessageChunk(MessageInternal msgI,
+                                                   long messageBodySize,
+                                                   boolean sendBlocking,
+                                                   boolean lastChunk,
+                                                   byte[] chunk,
+                                                   SendAcknowledgementHandler messageHandler) throws ActiveMQException;
+
    public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
 
    public abstract void createSharedQueue(SimpleString address,