You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/04/10 18:00:12 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1098 Improve flow control
while streaming large messages
Repository: activemq-artemis
Updated Branches:
refs/heads/master 73c79de8a -> 359592cf5
ARTEMIS-1098 Improve flow control while streaming large messages
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/da6b851c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/da6b851c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/da6b851c
Branch: refs/heads/master
Commit: da6b851c60329538f5f65ae83c9548c9bd0e40f9
Parents: 73c79de
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Apr 10 13:47:54 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 10 13:58:33 2017 -0400
----------------------------------------------------------------------
.../core/client/ActiveMQClientLogger.java | 5 ++
.../core/impl/ActiveMQSessionContext.java | 57 ++++++++++++--------
2 files changed, 40 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/da6b851c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index 0fe4a5a..748e508 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupBindError(String hostAndPort);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 212057, value = "Large Message Streaming is taking too long to flush on back pressure.",
+ format = Message.Format.MESSAGE_FORMAT)
+ void timeoutStreamingLargeMessage();
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
void onMessageError(@Cause Throwable e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/da6b851c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 6f92330..7799395 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -458,17 +459,7 @@ public class ActiveMQSessionContext extends SessionContext {
byte[] chunk,
int reconnectID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
- final boolean requiresResponse = lastChunk && sendBlocking;
- final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
-
- if (requiresResponse) {
- // When sending it blocking, only the last chunk will be blocking.
- sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
- } else {
- sessionChannel.send(chunkPacket, reconnectID);
- }
-
- return chunkPacket.getPacketSize();
+ return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
}
@Override
@@ -478,17 +469,7 @@ public class ActiveMQSessionContext extends SessionContext {
boolean lastChunk,
byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
- final boolean requiresResponse = lastChunk && sendBlocking;
- final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
-
- if (requiresResponse) {
- // When sending it blocking, only the last chunk will be blocking.
- sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
- } else {
- sessionChannel.send(chunkPacket);
- }
-
- return chunkPacket.getPacketSize();
+ return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
}
@Override
@@ -813,6 +794,38 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
+ private static int sendSessionSendContinuationMessage(Channel channel,
+ Message msgI,
+ long messageBodySize,
+ boolean sendBlocking,
+ boolean lastChunk,
+ byte[] chunk,
+ SendAcknowledgementHandler messageHandler) throws ActiveMQException {
+ final boolean requiresResponse = lastChunk && sendBlocking;
+ final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
+ final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
+ //perform a weak form of flow control to avoid OOM on tight loops
+ final CoreRemotingConnection connection = channel.getConnection();
+ final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
+ final long startFlowControl = System.nanoTime();
+ final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
+ if (!isWritable) {
+ final long endFlowControl = System.nanoTime();
+ final long elapsedFlowControl = endFlowControl - startFlowControl;
+ final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
+ ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
+ logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
+ }
+ if (requiresResponse) {
+ // When sending it blocking, only the last chunk will be blocking.
+ channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+ } else {
+ channel.send(chunkPacket);
+ }
+ return chunkPacket.getPacketSize();
+ }
+
+
class ClientSessionPacketHandler implements ChannelHandler {
@Override
[2/2] activemq-artemis git commit: This closes #1190
Posted by cl...@apache.org.
This closes #1190
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/359592cf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/359592cf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/359592cf
Branch: refs/heads/master
Commit: 359592cf5eb17fd7d2f3fd99c18d0427e240366d
Parents: 73c79de da6b851
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Apr 10 14:00:05 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 10 14:00:05 2017 -0400
----------------------------------------------------------------------
.../core/client/ActiveMQClientLogger.java | 5 ++
.../core/impl/ActiveMQSessionContext.java | 57 ++++++++++++--------
2 files changed, 40 insertions(+), 22 deletions(-)
----------------------------------------------------------------------