You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/11 18:45:11 UTC
[2/3] activemq-artemis git commit: ARTEMIS-1269 Improving Session
Handling after replication changes
ARTEMIS-1269 Improving Session Handling after replication changes
changes on ARTEMIS-1269 would have some implications on performance
this is to improve the packet handling.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4b943a74
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4b943a74
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4b943a74
Branch: refs/heads/master
Commit: 4b943a745b1e91f41dc89bb924ac33378a21aa79
Parents: 7fd17f4
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jul 11 12:04:38 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 11 14:23:03 2017 -0400
----------------------------------------------------------------------
.../core/protocol/ServerPacketDecoder.java | 51 ++-
.../core/ServerSessionPacketHandler.java | 324 +++++++++++++------
2 files changed, 279 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b943a74/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 45082b9..bcbe633 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -46,6 +46,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
@@ -70,6 +73,9 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
@@ -78,18 +84,55 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
private static final long serialVersionUID = 3348673114388400766L;
public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
+ private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in) {
+ final SessionSendMessage sendMessage = new SessionSendMessage(new CoreMessage());
+ sendMessage.decode(in);
+ return sendMessage;
+ }
+
+ private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer in) {
+ final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage();
+ acknowledgeMessage.decode(in);
+ return acknowledgeMessage;
+ }
+
+ private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final ActiveMQBuffer in) {
+ final SessionRequestProducerCreditsMessage requestProducerCreditsMessage = new SessionRequestProducerCreditsMessage();
+ requestProducerCreditsMessage.decode(in);
+ return requestProducerCreditsMessage;
+ }
+
+ private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final ActiveMQBuffer in) {
+ final SessionConsumerFlowCreditMessage sessionConsumerFlowCreditMessage = new SessionConsumerFlowCreditMessage();
+ sessionConsumerFlowCreditMessage.decode(in);
+ return sessionConsumerFlowCreditMessage;
+ }
+
@Override
public Packet decode(final ActiveMQBuffer in) {
final byte packetType = in.readByte();
+ //optimized for the most common cases: hottest and commons methods will be inlined and this::decode too due to the byte code size
+ switch (packetType) {
+ case SESS_SEND:
+ return decodeSessionSendMessage(in);
+ case SESS_ACKNOWLEDGE:
+ return decodeSessionAcknowledgeMessage(in);
+ case SESS_PRODUCER_REQUEST_CREDITS:
+ return decodeRequestProducerCreditsMessage(in);
+ case SESS_FLOWTOKEN:
+ return decodeSessionConsumerFlowCreditMessage(in);
+ default:
+ return slowPathDecode(in, packetType);
+ }
+ }
+
+ // separating for performance reasons
+ private Packet slowPathDecode(ActiveMQBuffer in, byte packetType) {
Packet packet;
switch (packetType) {
- case SESS_SEND: {
- packet = new SessionSendMessage(new CoreMessage());
- break;
- }
case SESS_SEND_LARGE: {
packet = new SessionSendLargeMessage(new CoreMessage());
break;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b943a74/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 0c95bed..bd97939 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -114,6 +114,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
@@ -249,11 +250,39 @@ public class ServerSessionPacketHandler implements ChannelHandler {
packetActor.act(packet);
}
-
- // this method is used as a listener on the packetActor
private void onMessagePacket(final Packet packet) {
- byte type = packet.getType();
+ if (logger.isTraceEnabled()) {
+ logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
+ }
+ final byte type = packet.getType();
+ switch (type) {
+ case SESS_SEND: {
+ onSessionSend(packet);
+ break;
+ }
+ case SESS_ACKNOWLEDGE: {
+ onSessionAcknowledge(packet);
+ break;
+ }
+ case SESS_PRODUCER_REQUEST_CREDITS: {
+ onSessionRequestProducerCredits(packet);
+ break;
+ }
+ case SESS_FLOWTOKEN: {
+ onSessionConsumerFlowCredit(packet);
+ break;
+ }
+ default:
+ // separating a method for everything else as JIT was faster this way
+ slowPacketHandler(packet);
+ break;
+ }
+ }
+ // This is being separated from onMessagePacket as JIT was more efficient with a small method for the
+ // hot executions.
+ private void slowPacketHandler(final Packet packet) {
+ final byte type = packet.getType();
storageManager.setContext(session.getSessionContext());
Packet response = null;
@@ -261,13 +290,23 @@ public class ServerSessionPacketHandler implements ChannelHandler {
boolean closeChannel = false;
boolean requiresResponse = false;
- if (logger.isTraceEnabled()) {
- logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
- }
-
try {
try {
switch (type) {
+ case SESS_SEND_LARGE: {
+ SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
+ sendLarge(message.getLargeMessage());
+ break;
+ }
+ case SESS_SEND_CONTINUATION: {
+ SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
+ requiresResponse = message.isRequiresResponse();
+ sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
+ if (requiresResponse) {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
case SESS_CREATECONSUMER: {
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
requiresResponse = request.isRequiresResponse();
@@ -385,15 +424,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
break;
}
- case SESS_ACKNOWLEDGE: {
- SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
- requiresResponse = message.isRequiresResponse();
- session.acknowledge(message.getConsumerID(), message.getMessageID());
- if (requiresResponse) {
- response = new NullResponseMessage();
- }
- break;
- }
case SESS_EXPIRED: {
SessionExpireMessage message = (SessionExpireMessage) packet;
session.expire(message.getConsumerID(), message.getMessageID());
@@ -534,44 +564,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
response = new NullResponseMessage();
break;
}
- case SESS_FLOWTOKEN: {
- SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
- session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
- break;
- }
- case SESS_SEND: {
- SessionSendMessage message = (SessionSendMessage) packet;
- requiresResponse = message.isRequiresResponse();
- session.send(message.getMessage(), direct);
- if (requiresResponse) {
- response = new NullResponseMessage();
- }
- break;
- }
- case SESS_SEND_LARGE: {
- SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
- sendLarge(message.getLargeMessage());
- break;
- }
- case SESS_SEND_CONTINUATION: {
- SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
- requiresResponse = message.isRequiresResponse();
- sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
- if (requiresResponse) {
- response = new NullResponseMessage();
- }
- break;
- }
case SESS_FORCE_CONSUMER_DELIVERY: {
SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet;
session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
break;
}
- case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: {
- SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
- session.requestProducerCredits(message.getAddress(), message.getCredits());
- break;
- }
case PacketImpl.SESS_ADD_METADATA: {
response = new NullResponseMessage();
SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
@@ -597,56 +594,203 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
} catch (ActiveMQIOErrorException e) {
- getSession().markTXFailed(e);
- if (requiresResponse) {
- logger.debug("Sending exception to client", e);
- response = new ActiveMQExceptionMessage(e);
- } else {
- ActiveMQServerLogger.LOGGER.caughtException(e);
- }
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
} catch (ActiveMQXAException e) {
- if (requiresResponse) {
- logger.debug("Sending exception to client", e);
- response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
- } else {
- ActiveMQServerLogger.LOGGER.caughtXaException(e);
- }
+ response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
} catch (ActiveMQQueueMaxConsumerLimitReached e) {
- if (requiresResponse) {
- logger.debug("Sending exception to client", e);
- response = new ActiveMQExceptionMessage(e);
- } else {
- ActiveMQServerLogger.LOGGER.caughtException(e);
- }
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
} catch (ActiveMQException e) {
+ response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (Throwable t) {
+ response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ }
+ sendResponse(packet, response, flush, closeChannel);
+ } finally {
+ storageManager.clearContext();
+ }
+ }
+
+ private void onSessionAcknowledge(Packet packet) {
+ this.storageManager.setContext(session.getSessionContext());
+ try {
+ Packet response = null;
+ boolean requiresResponse = false;
+ try {
+ final SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
+ requiresResponse = message.isRequiresResponse();
+ this.session.acknowledge(message.getConsumerID(), message.getMessageID());
if (requiresResponse) {
- logger.debug("Sending exception to client", e);
- response = new ActiveMQExceptionMessage(e);
- } else {
- if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
- logger.debug("Caught exception", e);
- } else {
- ActiveMQServerLogger.LOGGER.caughtException(e);
- }
+ response = new NullResponseMessage();
}
+ } catch (ActiveMQIOErrorException e) {
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ } catch (ActiveMQXAException e) {
+ response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQException e) {
+ response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
} catch (Throwable t) {
- getSession().markTXFailed(t);
+ response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ }
+ sendResponse(packet, response, false, false);
+ } finally {
+ this.storageManager.clearContext();
+ }
+ }
+
+ private void onSessionSend(Packet packet) {
+ this.storageManager.setContext(session.getSessionContext());
+ try {
+ Packet response = null;
+ boolean requiresResponse = false;
+ try {
+ final SessionSendMessage message = (SessionSendMessage) packet;
+ requiresResponse = message.isRequiresResponse();
+ this.session.send(message.getMessage(), this.direct);
if (requiresResponse) {
- ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t);
- ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
- activeMQInternalErrorException.initCause(t);
- response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
- } else {
- ActiveMQServerLogger.LOGGER.caughtException(t);
+ response = new NullResponseMessage();
}
+ } catch (ActiveMQIOErrorException e) {
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ } catch (ActiveMQXAException e) {
+ response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQException e) {
+ response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (Throwable t) {
+ response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
}
+ sendResponse(packet, response, false, false);
+ } finally {
+ this.storageManager.clearContext();
+ }
+ }
- sendResponse(packet, response, flush, closeChannel);
+ private void onSessionRequestProducerCredits(Packet packet) {
+ this.storageManager.setContext(session.getSessionContext());
+ try {
+ Packet response = null;
+ boolean requiresResponse = false;
+ try {
+ SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage) packet;
+ session.requestProducerCredits(message.getAddress(), message.getCredits());
+ } catch (ActiveMQIOErrorException e) {
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ } catch (ActiveMQXAException e) {
+ response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQException e) {
+ response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (Throwable t) {
+ response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ }
+ sendResponse(packet, response, false, false);
} finally {
- storageManager.clearContext();
+ this.storageManager.clearContext();
+ }
+ }
+
+ private void onSessionConsumerFlowCredit(Packet packet) {
+ this.storageManager.setContext(session.getSessionContext());
+ try {
+ Packet response = null;
+ boolean requiresResponse = false;
+ try {
+ SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
+ session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
+ } catch (ActiveMQIOErrorException e) {
+ response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response, this.session);
+ } catch (ActiveMQXAException e) {
+ response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+ response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse, response);
+ } catch (ActiveMQException e) {
+ response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+ } catch (Throwable t) {
+ response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+ }
+ sendResponse(packet, response, false, false);
+ } finally {
+ this.storageManager.clearContext();
+ }
+ }
+
+
+ private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException e,
+ boolean requiresResponse,
+ Packet response,
+ ServerSession session) {
+ session.markTXFailed(e);
+ if (requiresResponse) {
+ logger.debug("Sending exception to client", e);
+ response = new ActiveMQExceptionMessage(e);
+ } else {
+ ActiveMQServerLogger.LOGGER.caughtException(e);
+ }
+ return response;
+ }
+
+ private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e,
+ boolean requiresResponse,
+ Packet response) {
+ if (requiresResponse) {
+ logger.debug("Sending exception to client", e);
+ response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+ } else {
+ ActiveMQServerLogger.LOGGER.caughtXaException(e);
+ }
+ return response;
+ }
+
+ private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached e,
+ boolean requiresResponse,
+ Packet response) {
+ if (requiresResponse) {
+ logger.debug("Sending exception to client", e);
+ response = new ActiveMQExceptionMessage(e);
+ } else {
+ ActiveMQServerLogger.LOGGER.caughtException(e);
+ }
+ return response;
+ }
+
+ private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e,
+ boolean requiresResponse,
+ Packet response) {
+ if (requiresResponse) {
+ logger.debug("Sending exception to client", e);
+ response = new ActiveMQExceptionMessage(e);
+ } else {
+ if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
+ logger.debug("Caught exception", e);
+ } else {
+ ActiveMQServerLogger.LOGGER.caughtException(e);
+ }
+ }
+ return response;
+ }
+
+ private static Packet onCatchThrowableWhileHandlePacket(Throwable t,
+ boolean requiresResponse,
+ Packet response,
+ ServerSession session) {
+ session.markTXFailed(t);
+ if (requiresResponse) {
+ ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t);
+ ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
+ activeMQInternalErrorException.initCause(t);
+ response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
+ } else {
+ ActiveMQServerLogger.LOGGER.caughtException(t);
}
+ return response;
}
+
+
private void sendResponse(final Packet confirmPacket,
final Packet response,
final boolean flush,
@@ -792,12 +936,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage = largeMsg;
}
-
-
private void sendContinuations(final int packetSize,
- final long messageBodySize,
- final byte[] body,
- final boolean continues) throws Exception {
+ final long messageBodySize,
+ final byte[] body,
+ final boolean continues) throws Exception {
if (currentLargeMessage == null) {
throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
}
@@ -814,12 +956,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
}
-
session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false, false);
currentLargeMessage = null;
}
}
-
}