You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/08/24 20:37:19 UTC
[2/2] activemq-artemis git commit: ARTEMIS-2044 Add onSendException,
onMessageRouteException to ActiveMQServerMessagePlugin
ARTEMIS-2044 Add onSendException, onMessageRouteException to ActiveMQServerMessagePlugin
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/95ec8ea4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/95ec8ea4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/95ec8ea4
Branch: refs/heads/master
Commit: 95ec8ea433d1bd5cf8294b0018c1edb7d45c07f0
Parents: 46bc10e
Author: Carsten Lohmann <ca...@bosch-si.com>
Authored: Wed Aug 22 10:06:30 2018 +0200
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Fri Aug 24 21:35:34 2018 +0100
----------------------------------------------------------------------
.../core/postoffice/impl/PostOfficeImpl.java | 89 +++++++++++---------
.../core/server/impl/ServerSessionImpl.java | 78 +++++++++--------
.../plugin/ActiveMQServerMessagePlugin.java | 30 +++++++
.../impl/LoggingActiveMQServerPlugin.java | 46 ++++++++++
.../impl/LoggingActiveMQServerPluginLogger.java | 28 ++++++
.../plugin/MethodCalledVerifier.java | 19 +++++
6 files changed, 213 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9a3e844..598c32b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -877,62 +877,69 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
logger.trace("Message after routed=" + message);
}
- if (context.getQueueCount() == 0) {
- // Send to DLA if appropriate
+ try {
+ if (context.getQueueCount() == 0) {
+ // Send to DLA if appropriate
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
- boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
+ boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
- if (sendToDLA) {
- // Send to the DLA for the address
+ if (sendToDLA) {
+ // Send to the DLA for the address
- SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+ SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
- if (logger.isDebugEnabled()) {
- logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
+ }
- if (dlaAddress == null) {
- result = RoutingStatus.NO_BINDINGS;
- ActiveMQServerLogger.LOGGER.noDLA(address);
- } else {
- message.referenceOriginalMessage(message, null);
+ if (dlaAddress == null) {
+ result = RoutingStatus.NO_BINDINGS;
+ ActiveMQServerLogger.LOGGER.noDLA(address);
+ } else {
+ message.referenceOriginalMessage(message, null);
- message.setAddress(dlaAddress);
+ message.setAddress(dlaAddress);
- message.reencode();
+ message.reencode();
- route(message, context.getTransaction(), false);
- result = RoutingStatus.NO_BINDINGS_DLA;
- }
- } else {
- result = RoutingStatus.NO_BINDINGS;
+ route(message, context.getTransaction(), false);
+ result = RoutingStatus.NO_BINDINGS_DLA;
+ }
+ } else {
+ result = RoutingStatus.NO_BINDINGS;
- if (logger.isDebugEnabled()) {
- logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
+ }
- if (message.isLargeMessage()) {
- ((LargeServerMessage) message).deleteFile();
+ if (message.isLargeMessage()) {
+ ((LargeServerMessage) message).deleteFile();
+ }
}
- }
- } else {
- result = RoutingStatus.OK;
- try {
- processRoute(message, context, direct);
- } catch (ActiveMQAddressFullException e) {
- if (startedTX.get()) {
- context.getTransaction().rollback();
- } else if (context.getTransaction() != null) {
- context.getTransaction().markAsRollbackOnly(e);
+ } else {
+ result = RoutingStatus.OK;
+ try {
+ processRoute(message, context, direct);
+ } catch (ActiveMQAddressFullException e) {
+ if (startedTX.get()) {
+ context.getTransaction().rollback();
+ } else if (context.getTransaction() != null) {
+ context.getTransaction().markAsRollbackOnly(e);
+ }
+ throw e;
}
- throw e;
}
- }
- if (startedTX.get()) {
- context.getTransaction().commit();
+ if (startedTX.get()) {
+ context.getTransaction().commit();
+ }
+ } catch (Exception e) {
+ if (server.hasBrokerMessagePlugins()) {
+ server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
+ }
+ throw e;
}
if (server.hasBrokerMessagePlugins()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 4910e66..d868a2f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1426,54 +1426,60 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue));
}
- // If the protocol doesn't support flow control, we have no choice other than fail the communication
- if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
- ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
- this.getRemotingConnection().fail(exception);
- throw exception;
- }
-
final RoutingStatus result;
- //large message may come from StompSession directly, in which
- //case the id header already generated.
- if (!message.isLargeMessage()) {
- long id = storageManager.generateID();
- // This will re-encode the message
- message.setMessageID(id);
- }
+ try {
+ // If the protocol doesn't support flow control, we have no choice other than fail the communication
+ if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
+ ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
+ this.getRemotingConnection().fail(exception);
+ throw exception;
+ }
- SimpleString address = message.getAddressSimpleString();
+ //large message may come from StompSession directly, in which
+ //case the id header already generated.
+ if (!message.isLargeMessage()) {
+ long id = storageManager.generateID();
+ // This will re-encode the message
+ message.setMessageID(id);
+ }
- if (defaultAddress == null && address != null) {
- defaultAddress = address;
- }
+ SimpleString address = message.getAddressSimpleString();
- if (address == null) {
- // We don't want to force a re-encode when the message gets sent to the consumer
- message.setAddress(defaultAddress);
- }
+ if (defaultAddress == null && address != null) {
+ defaultAddress = address;
+ }
- if (logger.isTraceEnabled()) {
- logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
- }
+ if (address == null) {
+ // We don't want to force a re-encode when the message gets sent to the consumer
+ message.setAddress(defaultAddress);
+ }
- if (message.getAddress() == null) {
- // This could happen with some tests that are ignoring messages
- throw ActiveMQMessageBundle.BUNDLE.noAddress();
- }
+ if (logger.isTraceEnabled()) {
+ logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
+ }
- if (message.getAddressSimpleString().equals(managementAddress)) {
- // It's a management message
+ if (message.getAddress() == null) {
+ // This could happen with some tests that are ignoring messages
+ throw ActiveMQMessageBundle.BUNDLE.noAddress();
+ }
- result = handleManagementMessage(tx, message, direct);
- } else {
- result = doSend(tx, message, address, direct, noAutoCreateQueue);
- }
+ if (message.getAddressSimpleString().equals(managementAddress)) {
+ // It's a management message
+ result = handleManagementMessage(tx, message, direct);
+ } else {
+ result = doSend(tx, message, address, direct, noAutoCreateQueue);
+ }
+
+ } catch (Exception e) {
+ if (server.hasBrokerMessagePlugins()) {
+ server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e));
+ }
+ throw e;
+ }
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result));
}
-
return result;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
index aef0970..404e8a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
@@ -65,6 +65,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
this.afterSend(tx, message, direct, noAutoCreateQueue, result);
}
+ /**
+ * When there was an exception sending the message
+ *
+ * @param session
+ * @param tx
+ * @param message
+ * @param direct
+ * @param noAutoCreateQueue
+ * @param e the exception that occurred when sending the message
+ * @throws ActiveMQException
+ */
+ default void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+ Exception e) throws ActiveMQException {
+
+ }
/**
* Before a message is sent
@@ -129,6 +144,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
}
/**
+ * When there was an error routing the message
+ *
+ * @param message
+ * @param context
+ * @param direct
+ * @param rejectDuplicates
+ * @param e the exception that occurred during message routing
+ * @throws ActiveMQException
+ */
+ default void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+ Exception e) throws ActiveMQException {
+
+ }
+
+ /**
* Before a message is delivered to a client consumer
*
* @param consumer the consumer the message will be delivered to
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
index ff23b59..3483472 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
@@ -491,6 +491,32 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
}
}
+ @Override
+ public void onSendException(ServerSession session,
+ Transaction tx,
+ Message message,
+ boolean direct,
+ boolean noAutoCreateQueue,
+ Exception e) throws ActiveMQException {
+ if (logAll || logSendingEvents) {
+
+ if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) {
+ //details - debug level
+ LoggingActiveMQServerPluginLogger.LOGGER.onSendErrorDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+ message, (session == null ? UNAVAILABLE : session.getName()),
+ tx, session, direct, noAutoCreateQueue);
+ }
+
+ if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
+ //info level log
+ LoggingActiveMQServerPluginLogger.LOGGER.onSendError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+ (session == null ? UNAVAILABLE : session.getName()),
+ (session == null ? UNAVAILABLE : session.getConnectionID().toString()),
+ e);
+ }
+ }
+ }
+
/**
* Before a message is routed
*
@@ -540,6 +566,26 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
}
}
+ @Override
+ public void onMessageRouteException(Message message,
+ RoutingContext context,
+ boolean direct,
+ boolean rejectDuplicates,
+ Exception e) throws ActiveMQException {
+ if (logAll || logSendingEvents) {
+
+ //details - debug level logging
+ LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteErrorDetails(message, context, direct, rejectDuplicates);
+
+ if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
+ //info level log
+ LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+ e);
+ }
+
+ }
+ }
+
/**
* Before a message is delivered to a client consumer
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
index f519dd0..fa697ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
@@ -141,6 +141,15 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
@Message(id = 841016, value = "criticalFailure called with criticalComponent: {0}", format = Message.Format.MESSAGE_FORMAT)
void criticalFailure(CriticalComponent components);
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 841017, value = "error sending message with ID: {0}, session name: {1}, session connectionID: {2}," +
+ " exception: {3}", format = Message.Format.MESSAGE_FORMAT)
+ void onSendError(String messageID, String sessionName, String sessionConnectionID, Exception e);
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 841018, value = "error routing message with ID: {0}, exception: {1}", format = Message.Format.MESSAGE_FORMAT)
+ void onMessageRouteError(String messageID, Exception e);
+
//DEBUG messages
@LogMessage(level = Logger.Level.DEBUG)
@@ -258,4 +267,23 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
@Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT)
void beforeDeployBridge(BridgeConfiguration config);
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 843016, value = "onSendError message ID: {0}, message {1}, session name: {2} with tx: {3}, session: {4}, direct: {5}," +
+ " noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT)
+ void onSendErrorDetails(String messageID,
+ org.apache.activemq.artemis.api.core.Message message,
+ String sessionName,
+ Transaction tx,
+ ServerSession session,
+ boolean direct,
+ boolean noAutoCreateQueue);
+
+ @LogMessage(level = Logger.Level.DEBUG)
+ @Message(id = 843017, value = "onMessageRouteError message: {0}, with context: {1}, direct: {2}, rejectDuplicates: {3}",
+ format = Message.Format.MESSAGE_FORMAT)
+ void onMessageRouteErrorDetails(org.apache.activemq.artemis.api.core.Message message,
+ RoutingContext context,
+ boolean direct,
+ boolean rejectDuplicates);
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index 9c24505..0d802cf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -90,8 +90,10 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
public static final String MESSAGE_ACKED = "messageAcknowledged";
public static final String BEFORE_SEND = "beforeSend";
public static final String AFTER_SEND = "afterSend";
+ public static final String ON_SEND_EXCEPTION = "onSendException";
public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
+ public static final String ON_MESSAGE_ROUTE_EXCEPTION = "onMessageRouteException";
public static final String BEFORE_DELIVER = "beforeDeliver";
public static final String AFTER_DELIVER = "afterDeliver";
public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
@@ -305,6 +307,14 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
}
@Override
+ public void onSendException(ServerSession session, Transaction tx, Message message, boolean direct,
+ boolean noAutoCreateQueue, Exception e) {
+ Preconditions.checkNotNull(message);
+ Preconditions.checkNotNull(e);
+ methodCalled(ON_SEND_EXCEPTION);
+ }
+
+ @Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
Preconditions.checkNotNull(message);
Preconditions.checkNotNull(context);
@@ -321,6 +331,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
}
@Override
+ public void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+ Exception e) {
+ Preconditions.checkNotNull(message);
+ Preconditions.checkNotNull(context);
+ Preconditions.checkNotNull(e);
+ methodCalled(ON_MESSAGE_ROUTE_EXCEPTION);
+ }
+
+ @Override
public void beforeDeliver(ServerConsumer consumer, MessageReference reference) {
Preconditions.checkNotNull(reference);
methodCalled(BEFORE_DELIVER);