You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/08/24 20:37:19 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2044 Add onSendException, onMessageRouteException to ActiveMQServerMessagePlugin

ARTEMIS-2044 Add onSendException, onMessageRouteException to ActiveMQServerMessagePlugin


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/95ec8ea4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/95ec8ea4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/95ec8ea4

Branch: refs/heads/master
Commit: 95ec8ea433d1bd5cf8294b0018c1edb7d45c07f0
Parents: 46bc10e
Author: Carsten Lohmann <ca...@bosch-si.com>
Authored: Wed Aug 22 10:06:30 2018 +0200
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Fri Aug 24 21:35:34 2018 +0100

----------------------------------------------------------------------
 .../core/postoffice/impl/PostOfficeImpl.java    | 89 +++++++++++---------
 .../core/server/impl/ServerSessionImpl.java     | 78 +++++++++--------
 .../plugin/ActiveMQServerMessagePlugin.java     | 30 +++++++
 .../impl/LoggingActiveMQServerPlugin.java       | 46 ++++++++++
 .../impl/LoggingActiveMQServerPluginLogger.java | 28 ++++++
 .../plugin/MethodCalledVerifier.java            | 19 +++++
 6 files changed, 213 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9a3e844..598c32b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -877,62 +877,69 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          logger.trace("Message after routed=" + message);
       }
 
-      if (context.getQueueCount() == 0) {
-         // Send to DLA if appropriate
+      try {
+         if (context.getQueueCount() == 0) {
+            // Send to DLA if appropriate
 
-         AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
+            AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
 
-         boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
+            boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
 
-         if (sendToDLA) {
-            // Send to the DLA for the address
+            if (sendToDLA) {
+               // Send to the DLA for the address
 
-            SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+               SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
 
-            if (logger.isDebugEnabled()) {
-               logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
-            }
+               if (logger.isDebugEnabled()) {
+                  logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
+               }
 
-            if (dlaAddress == null) {
-               result = RoutingStatus.NO_BINDINGS;
-               ActiveMQServerLogger.LOGGER.noDLA(address);
-            } else {
-               message.referenceOriginalMessage(message, null);
+               if (dlaAddress == null) {
+                  result = RoutingStatus.NO_BINDINGS;
+                  ActiveMQServerLogger.LOGGER.noDLA(address);
+               } else {
+                  message.referenceOriginalMessage(message, null);
 
-               message.setAddress(dlaAddress);
+                  message.setAddress(dlaAddress);
 
-               message.reencode();
+                  message.reencode();
 
-               route(message, context.getTransaction(), false);
-               result = RoutingStatus.NO_BINDINGS_DLA;
-            }
-         } else {
-            result = RoutingStatus.NO_BINDINGS;
+                  route(message, context.getTransaction(), false);
+                  result = RoutingStatus.NO_BINDINGS_DLA;
+               }
+            } else {
+               result = RoutingStatus.NO_BINDINGS;
 
-            if (logger.isDebugEnabled()) {
-               logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
-            }
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
+               }
 
-            if (message.isLargeMessage()) {
-               ((LargeServerMessage) message).deleteFile();
+               if (message.isLargeMessage()) {
+                  ((LargeServerMessage) message).deleteFile();
+               }
             }
-         }
-      } else {
-         result = RoutingStatus.OK;
-         try {
-            processRoute(message, context, direct);
-         } catch (ActiveMQAddressFullException e) {
-            if (startedTX.get()) {
-               context.getTransaction().rollback();
-            } else if (context.getTransaction() != null) {
-               context.getTransaction().markAsRollbackOnly(e);
+         } else {
+            result = RoutingStatus.OK;
+            try {
+               processRoute(message, context, direct);
+            } catch (ActiveMQAddressFullException e) {
+               if (startedTX.get()) {
+                  context.getTransaction().rollback();
+               } else if (context.getTransaction() != null) {
+                  context.getTransaction().markAsRollbackOnly(e);
+               }
+               throw e;
             }
-            throw e;
          }
-      }
 
-      if (startedTX.get()) {
-         context.getTransaction().commit();
+         if (startedTX.get()) {
+            context.getTransaction().commit();
+         }
+      } catch (Exception e) {
+         if (server.hasBrokerMessagePlugins()) {
+            server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
+         }
+         throw e;
       }
 
       if (server.hasBrokerMessagePlugins()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 4910e66..d868a2f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1426,54 +1426,60 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue));
       }
 
-      // If the protocol doesn't support flow control, we have no choice other than fail the communication
-      if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
-         ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
-         this.getRemotingConnection().fail(exception);
-         throw exception;
-      }
-
       final RoutingStatus result;
-      //large message may come from StompSession directly, in which
-      //case the id header already generated.
-      if (!message.isLargeMessage()) {
-         long id = storageManager.generateID();
-         // This will re-encode the message
-         message.setMessageID(id);
-      }
+      try {
+         // If the protocol doesn't support flow control, we have no choice other than fail the communication
+         if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
+            ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
+            this.getRemotingConnection().fail(exception);
+            throw exception;
+         }
 
-      SimpleString address = message.getAddressSimpleString();
+         //large message may come from StompSession directly, in which
+         //case the id header already generated.
+         if (!message.isLargeMessage()) {
+            long id = storageManager.generateID();
+            // This will re-encode the message
+            message.setMessageID(id);
+         }
 
-      if (defaultAddress == null && address != null) {
-         defaultAddress = address;
-      }
+         SimpleString address = message.getAddressSimpleString();
 
-      if (address == null) {
-         // We don't want to force a re-encode when the message gets sent to the consumer
-         message.setAddress(defaultAddress);
-      }
+         if (defaultAddress == null && address != null) {
+            defaultAddress = address;
+         }
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
-      }
+         if (address == null) {
+            // We don't want to force a re-encode when the message gets sent to the consumer
+            message.setAddress(defaultAddress);
+         }
 
-      if (message.getAddress() == null) {
-         // This could happen with some tests that are ignoring messages
-         throw ActiveMQMessageBundle.BUNDLE.noAddress();
-      }
+         if (logger.isTraceEnabled()) {
+            logger.trace("send(message=" + message + ", direct=" + direct + ") being called");
+         }
 
-      if (message.getAddressSimpleString().equals(managementAddress)) {
-         // It's a management message
+         if (message.getAddress() == null) {
+            // This could happen with some tests that are ignoring messages
+            throw ActiveMQMessageBundle.BUNDLE.noAddress();
+         }
 
-         result = handleManagementMessage(tx, message, direct);
-      } else {
-         result = doSend(tx, message, address, direct, noAutoCreateQueue);
-      }
+         if (message.getAddressSimpleString().equals(managementAddress)) {
+            // It's a management message
 
+            result = handleManagementMessage(tx, message, direct);
+         } else {
+            result = doSend(tx, message, address, direct, noAutoCreateQueue);
+         }
+
+      } catch (Exception e) {
+         if (server.hasBrokerMessagePlugins()) {
+            server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e));
+         }
+         throw e;
+      }
       if (server.hasBrokerMessagePlugins()) {
          server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result));
       }
-
       return result;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
index aef0970..404e8a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
@@ -65,6 +65,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
       this.afterSend(tx, message, direct, noAutoCreateQueue, result);
    }
 
+   /**
+    * When there was an exception sending the message
+    *
+    * @param session
+    * @param tx
+    * @param message
+    * @param direct
+    * @param noAutoCreateQueue
+    * @param e the exception that occurred when sending the message
+    * @throws ActiveMQException
+    */
+   default void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
+                                Exception e) throws ActiveMQException {
+
+   }
 
    /**
     * Before a message is sent
@@ -129,6 +144,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
    }
 
    /**
+    * When there was an error routing the message
+    *
+    * @param message
+    * @param context
+    * @param direct
+    * @param rejectDuplicates
+    * @param e the exception that occurred during message routing
+    * @throws ActiveMQException
+    */
+   default void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+                                        Exception e) throws ActiveMQException {
+
+   }
+
+   /**
     * Before a message is delivered to a client consumer
     *
     * @param consumer the consumer the message will be delivered to

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
index ff23b59..3483472 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
@@ -491,6 +491,32 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
       }
    }
 
+   @Override
+   public void onSendException(ServerSession session,
+                               Transaction tx,
+                               Message message,
+                               boolean direct,
+                               boolean noAutoCreateQueue,
+                               Exception e) throws ActiveMQException {
+      if (logAll || logSendingEvents) {
+
+         if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) {
+            //details - debug level
+            LoggingActiveMQServerPluginLogger.LOGGER.onSendErrorDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+                                                                        message, (session == null ? UNAVAILABLE : session.getName()),
+                                                                        tx, session, direct, noAutoCreateQueue);
+         }
+
+         if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
+            //info level log
+            LoggingActiveMQServerPluginLogger.LOGGER.onSendError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+                                                                 (session == null ? UNAVAILABLE : session.getName()),
+                                                                 (session == null ? UNAVAILABLE : session.getConnectionID().toString()),
+                                                                 e);
+         }
+      }
+   }
+
    /**
     * Before a message is routed
     *
@@ -540,6 +566,26 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
       }
    }
 
+   @Override
+   public void onMessageRouteException(Message message,
+                                       RoutingContext context,
+                                       boolean direct,
+                                       boolean rejectDuplicates,
+                                       Exception e) throws ActiveMQException {
+      if (logAll || logSendingEvents) {
+
+         //details - debug level logging
+         LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteErrorDetails(message, context, direct, rejectDuplicates);
+
+         if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) {
+            //info level log
+            LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+                                                                         e);
+         }
+
+      }
+   }
+
    /**
     * Before a message is delivered to a client consumer
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
index f519dd0..fa697ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
@@ -141,6 +141,15 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
    @Message(id = 841016, value = "criticalFailure called with criticalComponent: {0}", format = Message.Format.MESSAGE_FORMAT)
    void criticalFailure(CriticalComponent components);
 
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 841017, value = "error sending message with ID: {0}, session name: {1}, session connectionID: {2}," +
+      " exception: {3}", format = Message.Format.MESSAGE_FORMAT)
+   void onSendError(String messageID, String sessionName, String sessionConnectionID, Exception e);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 841018, value = "error routing message with ID: {0}, exception: {1}", format = Message.Format.MESSAGE_FORMAT)
+   void onMessageRouteError(String messageID, Exception e);
+
    //DEBUG messages
 
    @LogMessage(level = Logger.Level.DEBUG)
@@ -258,4 +267,23 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
    @Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT)
    void beforeDeployBridge(BridgeConfiguration config);
 
+   @LogMessage(level = Logger.Level.DEBUG)
+   @Message(id = 843016, value = "onSendError message ID: {0}, message {1}, session name: {2} with tx: {3}, session: {4}, direct: {5}," +
+      " noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT)
+   void onSendErrorDetails(String messageID,
+                           org.apache.activemq.artemis.api.core.Message message,
+                           String sessionName,
+                           Transaction tx,
+                           ServerSession session,
+                           boolean direct,
+                           boolean noAutoCreateQueue);
+
+   @LogMessage(level = Logger.Level.DEBUG)
+   @Message(id = 843017, value = "onMessageRouteError message: {0}, with context: {1}, direct: {2}, rejectDuplicates: {3}",
+      format = Message.Format.MESSAGE_FORMAT)
+   void onMessageRouteErrorDetails(org.apache.activemq.artemis.api.core.Message message,
+                                   RoutingContext context,
+                                   boolean direct,
+                                   boolean rejectDuplicates);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index 9c24505..0d802cf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -90,8 +90,10 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
    public static final String MESSAGE_ACKED = "messageAcknowledged";
    public static final String BEFORE_SEND = "beforeSend";
    public static final String AFTER_SEND = "afterSend";
+   public static final String ON_SEND_EXCEPTION = "onSendException";
    public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
    public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
+   public static final String ON_MESSAGE_ROUTE_EXCEPTION = "onMessageRouteException";
    public static final String BEFORE_DELIVER = "beforeDeliver";
    public static final String AFTER_DELIVER = "afterDeliver";
    public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
@@ -305,6 +307,14 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
    }
 
    @Override
+   public void onSendException(ServerSession session, Transaction tx, Message message, boolean direct,
+                               boolean noAutoCreateQueue, Exception e) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(e);
+      methodCalled(ON_SEND_EXCEPTION);
+   }
+
+   @Override
    public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) {
       Preconditions.checkNotNull(message);
       Preconditions.checkNotNull(context);
@@ -321,6 +331,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
    }
 
    @Override
+   public void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
+                                       Exception e) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(context);
+      Preconditions.checkNotNull(e);
+      methodCalled(ON_MESSAGE_ROUTE_EXCEPTION);
+   }
+
+   @Override
    public void beforeDeliver(ServerConsumer consumer, MessageReference reference) {
       Preconditions.checkNotNull(reference);
       methodCalled(BEFORE_DELIVER);