You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/07 22:47:13 UTC

[1/3] activemq-artemis git commit: ARTEMIS-1324 Deadlock detection and health check of critical components

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 8f33d276d -> 01b37de76


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 9269eb3..ecfbf09 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
@@ -92,6 +93,8 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.jboss.logging.Logger;
 
 /**
@@ -99,7 +102,13 @@ import org.jboss.logging.Logger;
  * <p>
  * Completely non blocking between adding to queue and delivering to consumers.
  */
-public class QueueImpl implements Queue {
+public class QueueImpl extends CriticalComponentImpl implements Queue {
+
+   protected static final int CRITICAL_PATHS = 4;
+   protected static final int CRITICAL_PATH_ADD_TAIL = 0;
+   protected static final int CRITICAL_PATH_ADD_HEAD = 1;
+   protected static final int CRITICAL_DELIVER = 2;
+   protected static final int CRITICAL_CONSUMER = 3;
 
    private static final Logger logger = Logger.getLogger(QueueImpl.class);
 
@@ -253,6 +262,8 @@ public class QueueImpl implements Queue {
 
    private volatile RoutingType routingType;
 
+   private final QueueFactory factory;
+
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -332,8 +343,9 @@ public class QueueImpl implements Queue {
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                     final Executor executor,
-                    final ActiveMQServer server) {
-      this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
+                    final ActiveMQServer server,
+                    final QueueFactory factory) {
+      this(id, address, name, filter, null, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
    }
 
    public QueueImpl(final long id,
@@ -350,8 +362,9 @@ public class QueueImpl implements Queue {
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                     final Executor executor,
-                    final ActiveMQServer server) {
-      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
+                    final ActiveMQServer server,
+                    final QueueFactory factory) {
+      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
    }
 
    public QueueImpl(final long id,
@@ -371,7 +384,9 @@ public class QueueImpl implements Queue {
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                     final Executor executor,
-                    final ActiveMQServer server) {
+                    final ActiveMQServer server,
+                    final QueueFactory factory) {
+      super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);
 
       this.id = id;
 
@@ -426,6 +441,8 @@ public class QueueImpl implements Queue {
       this.executor = executor;
 
       this.user = user;
+
+      this.factory = factory;
    }
 
    // Bindable implementation -------------------------------------------------------------------------------------
@@ -572,28 +589,42 @@ public class QueueImpl implements Queue {
 
    /* Called when a message is cancelled back into the queue */
    @Override
-   public synchronized void addHead(final MessageReference ref, boolean scheduling) {
-      flushDeliveriesInTransit();
-      if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
-         return;
-      }
+   public void addHead(final MessageReference ref, boolean scheduling) {
+      enterCritical(CRITICAL_PATH_ADD_HEAD);
+      synchronized (this) {
+         try {
+            flushDeliveriesInTransit();
+            if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
+               return;
+            }
 
-      internalAddHead(ref);
+            internalAddHead(ref);
 
-      directDeliver = false;
+            directDeliver = false;
+         } finally {
+            leaveCritical(CRITICAL_PATH_ADD_HEAD);
+         }
+      }
    }
 
    /* Called when a message is cancelled back into the queue */
    @Override
-   public synchronized void addHead(final List<MessageReference> refs, boolean scheduling) {
-      flushDeliveriesInTransit();
-      for (MessageReference ref : refs) {
-         addHead(ref, scheduling);
-      }
+   public void addHead(final List<MessageReference> refs, boolean scheduling) {
+      enterCritical(CRITICAL_PATH_ADD_HEAD);
+      synchronized (this) {
+         try {
+            flushDeliveriesInTransit();
+            for (MessageReference ref : refs) {
+               addHead(ref, scheduling);
+            }
 
-      resetAllIterators();
+            resetAllIterators();
 
-      deliverAsync();
+            deliverAsync();
+         } finally {
+            leaveCritical(CRITICAL_PATH_ADD_HEAD);
+         }
+      }
    }
 
    @Override
@@ -617,46 +648,46 @@ public class QueueImpl implements Queue {
 
    @Override
    public void addTail(final MessageReference ref, final boolean direct) {
-      if (scheduleIfPossible(ref)) {
-         return;
-      }
+      enterCritical(CRITICAL_PATH_ADD_TAIL);
+      try {
+         if (scheduleIfPossible(ref)) {
+            return;
+         }
 
-      synchronized (directDeliveryGuard) {
-         // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
-         // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
-         // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
-         if (supportsDirectDeliver && !directDeliver &&
-            direct &&
-            System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
-            lastDirectDeliveryCheck = System.currentTimeMillis();
-
-            if (intermediateMessageReferences.isEmpty() &&
-               messageReferences.isEmpty() &&
-               !pageIterator.hasNext() &&
-               !pageSubscription.isPaging()) {
-               // We must block on the executor to ensure any async deliveries have completed or we might get out of order
-               // deliveries
-               if (flushExecutor() && flushDeliveriesInTransit()) {
-                  // Go into direct delivery mode
-                  directDeliver = supportsDirectDeliver;
+         synchronized (directDeliveryGuard) {
+            // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
+            // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
+            // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
+            if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
+               lastDirectDeliveryCheck = System.currentTimeMillis();
+
+               if (intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() && !pageIterator.hasNext() && !pageSubscription.isPaging()) {
+                  // We must block on the executor to ensure any async deliveries have completed or we might get out of order
+                  // deliveries
+                  if (flushExecutor() && flushDeliveriesInTransit()) {
+                     // Go into direct delivery mode
+                     directDeliver = supportsDirectDeliver;
+                  }
                }
             }
          }
-      }
 
-      if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
-         return;
-      }
+         if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
+            return;
+         }
 
-      // We only add queueMemorySize if not being delivered directly
-      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+         // We only add queueMemorySize if not being delivered directly
+         queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
 
-      intermediateMessageReferences.add(ref);
+         intermediateMessageReferences.add(ref);
 
-      directDeliver = false;
+         directDeliver = false;
 
-      // Delivery async will both poll for intermediate reference and deliver to clients
-      deliverAsync();
+         // Delivery async will both poll for intermediate reference and deliver to clients
+         deliverAsync();
+      } finally {
+         leaveCritical(CRITICAL_PATH_ADD_TAIL);
+      }
    }
 
    protected boolean scheduleIfPossible(MessageReference ref) {
@@ -788,93 +819,105 @@ public class QueueImpl implements Queue {
          logger.debug(this + " adding consumer " + consumer);
       }
 
-      synchronized (this) {
+      enterCritical(CRITICAL_CONSUMER);
+      try {
+         synchronized (this) {
 
-         if (maxConsumers != MAX_CONSUMERS_UNLIMITED && noConsumers.get() >= maxConsumers) {
-            throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
-         }
+            if (maxConsumers != MAX_CONSUMERS_UNLIMITED && noConsumers.get() >= maxConsumers) {
+               throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
+            }
 
-         flushDeliveriesInTransit();
+            flushDeliveriesInTransit();
 
-         consumersChanged = true;
+            consumersChanged = true;
 
-         if (!consumer.supportsDirectDelivery()) {
-            this.supportsDirectDeliver = false;
-         }
+            if (!consumer.supportsDirectDelivery()) {
+               this.supportsDirectDeliver = false;
+            }
 
-         cancelRedistributor();
+            cancelRedistributor();
 
-         consumerList.add(new ConsumerHolder(consumer));
+            consumerList.add(new ConsumerHolder(consumer));
 
-         if (consumerSet.add(consumer)) {
-            consumersCount.incrementAndGet();
-         }
+            if (consumerSet.add(consumer)) {
+               consumersCount.incrementAndGet();
+            }
 
-         if (refCountForConsumers != null) {
-            refCountForConsumers.increment();
-         }
+            if (refCountForConsumers != null) {
+               refCountForConsumers.increment();
+            }
 
-         noConsumers.incrementAndGet();
+            noConsumers.incrementAndGet();
+         }
+      } finally {
+         leaveCritical(CRITICAL_CONSUMER);
       }
 
+
    }
 
    @Override
    public void removeConsumer(final Consumer consumer) {
-      synchronized (this) {
-         consumersChanged = true;
 
-         for (ConsumerHolder holder : consumerList) {
-            if (holder.consumer == consumer) {
-               if (holder.iter != null) {
-                  holder.iter.close();
+      enterCritical(CRITICAL_CONSUMER);
+      try {
+         synchronized (this) {
+            consumersChanged = true;
+
+            for (ConsumerHolder holder : consumerList) {
+               if (holder.consumer == consumer) {
+                  if (holder.iter != null) {
+                     holder.iter.close();
+                  }
+                  consumerList.remove(holder);
+                  break;
                }
-               consumerList.remove(holder);
-               break;
             }
-         }
 
-         this.supportsDirectDeliver = checkConsumerDirectDeliver();
+            this.supportsDirectDeliver = checkConsumerDirectDeliver();
 
-         if (pos > 0 && pos >= consumerList.size()) {
-            pos = consumerList.size() - 1;
-         }
+            if (pos > 0 && pos >= consumerList.size()) {
+               pos = consumerList.size() - 1;
+            }
 
-         if (consumerSet.remove(consumer)) {
-            consumersCount.decrementAndGet();
-         }
+            if (consumerSet.remove(consumer)) {
+               consumersCount.decrementAndGet();
+            }
 
-         LinkedList<SimpleString> groupsToRemove = null;
+            LinkedList<SimpleString> groupsToRemove = null;
 
-         for (SimpleString groupID : groups.keySet()) {
-            if (consumer == groups.get(groupID)) {
-               if (groupsToRemove == null) {
-                  groupsToRemove = new LinkedList<>();
+            for (SimpleString groupID : groups.keySet()) {
+               if (consumer == groups.get(groupID)) {
+                  if (groupsToRemove == null) {
+                     groupsToRemove = new LinkedList<>();
+                  }
+                  groupsToRemove.add(groupID);
                }
-               groupsToRemove.add(groupID);
             }
-         }
 
-         // We use an auxiliary List here to avoid concurrent modification exceptions on the keySet
-         // while the iteration is being done.
-         // Since that's a simple HashMap there's no Iterator's support with a remove operation
-         if (groupsToRemove != null) {
-            for (SimpleString groupID : groupsToRemove) {
-               groups.remove(groupID);
+            // We use an auxiliary List here to avoid concurrent modification exceptions on the keySet
+            // while the iteration is being done.
+            // Since that's a simple HashMap there's no Iterator's support with a remove operation
+            if (groupsToRemove != null) {
+               for (SimpleString groupID : groupsToRemove) {
+                  groups.remove(groupID);
+               }
             }
-         }
 
-         if (refCountForConsumers != null) {
-            refCountForConsumers.decrement();
-         }
+            if (refCountForConsumers != null) {
+               refCountForConsumers.decrement();
+            }
 
-         noConsumers.decrementAndGet();
+            noConsumers.decrementAndGet();
+         }
+      } finally {
+         leaveCritical(CRITICAL_CONSUMER);
       }
    }
 
    private boolean checkConsumerDirectDeliver() {
       boolean supports = true;
-      for (ConsumerHolder consumerCheck: consumerList) {
+      for (ConsumerHolder consumerCheck : consumerList) {
          if (!consumerCheck.consumer.supportsDirectDelivery()) {
             supports = false;
          }
@@ -1021,9 +1064,7 @@ public class QueueImpl implements Queue {
       if (pageSubscription != null) {
          // messageReferences will have depaged messages which we need to discount from the counter as they are
          // counted on the pageSubscription as well
-         return messageReferences.size() + getScheduledCount() +
-            deliveringCount.get() +
-            pageSubscription.getMessageCount();
+         return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
       } else {
          return messageReferences.size() + getScheduledCount() + deliveringCount.get();
       }
@@ -1505,6 +1546,10 @@ public class QueueImpl implements Queue {
       } catch (Exception e) {
          tx.rollback();
          throw e;
+      } finally {
+         if (factory != null) {
+            factory.queueRemoved(this);
+         }
       }
    }
 
@@ -2286,15 +2331,7 @@ public class QueueImpl implements Queue {
          }
 
          if (logger.isDebugEnabled()) {
-            logger.debug("Queue Memory Size after depage on queue=" + this.getName() +
-                            " is " +
-                            queueMemorySize.get() +
-                            " with maxSize = " +
-                            maxSize +
-                            ". Depaged " +
-                            depaged +
-                            " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() +
-                            ", queueDelivering=" + deliveringCount.get());
+            logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringCount.get());
 
          }
       }
@@ -2474,9 +2511,7 @@ public class QueueImpl implements Queue {
       });
    }
 
-   private Pair<String, Binding> locateTargetBinding(SimpleString queueSuffix,
-                                                     Message copyMessage,
-                                                     long oldQueueID) {
+   private Pair<String, Binding> locateTargetBinding(SimpleString queueSuffix, Message copyMessage, long oldQueueID) {
       String targetNodeID = null;
       Binding targetBinding = null;
 
@@ -2528,8 +2563,8 @@ public class QueueImpl implements Queue {
    }
 
    private Message makeCopy(final MessageReference ref,
-                                  final boolean expiry,
-                                  final boolean copyOriginalHeaders) throws Exception {
+                            final boolean expiry,
+                            final boolean copyOriginalHeaders) throws Exception {
       Message message = ref.getMessage();
       /*
        We copy the message and send that to the dla/expiry queue - this is
@@ -2945,8 +2980,13 @@ public class QueueImpl implements Queue {
             // this will avoid that possibility
             // We will be using the deliverRunner instance as the guard object to avoid multiple threads executing
             // an asynchronous delivery
-            synchronized (QueueImpl.this.deliverRunner) {
-               deliver();
+            enterCritical(CRITICAL_DELIVER);
+            try {
+               synchronized (QueueImpl.this.deliverRunner) {
+                  deliver();
+               }
+            } finally {
+               leaveCritical(CRITICAL_DELIVER);
             }
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.errorDelivering(e);
@@ -3154,9 +3194,7 @@ public class QueueImpl implements Queue {
       } else {
          if (slowConsumerReaperRunnable == null) {
             scheduleSlowConsumerReaper(settings);
-         } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() ||
-            slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() ||
-            !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) {
+         } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) {
             slowConsumerReaperFuture.cancel(false);
             scheduleSlowConsumerReaper(settings);
          }
@@ -3169,10 +3207,7 @@ public class QueueImpl implements Queue {
       slowConsumerReaperFuture = scheduledExecutor.scheduleWithFixedDelay(slowConsumerReaperRunnable, settings.getSlowConsumerCheckPeriod(), settings.getSlowConsumerCheckPeriod(), TimeUnit.SECONDS);
 
       if (logger.isDebugEnabled()) {
-         logger.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() +
-                         "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() +
-                         ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() +
-                         ", slow-consumer-policy=" + settings.getSlowConsumerPolicy());
+         logger.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() + "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + ", slow-consumer-policy=" + settings.getSlowConsumerPolicy());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index b1eab66..b7dcdc3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -38,7 +38,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-
+import org.apache.activemq.artemis.utils.critical.CriticalComponent;
 
 public interface ActiveMQServerPlugin {
 
@@ -399,4 +399,13 @@ public interface ActiveMQServerPlugin {
 
    }
 
+   /**
+    * A Critical failure has been detected.
+    * This will be called before the broker is stopped
+    * @param components
+    * @throws ActiveMQException
+    */
+   default void criticalFailure(CriticalComponent components) throws ActiveMQException {
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index fb1d147..03d0896 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -768,6 +768,38 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="critical-analyzer" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  should analyze response time on critical paths and decide for broker shutdown or halt.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
+         <xsd:element name="critical-analyzer-timeout" type="xsd:long" default="120000" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The default timeout used on analyzing timeouts on the critical path.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
+         <xsd:element name="critical-analyzer-check-period" type="xsd:long" default="0" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The timeout here will be defaulted to half critical-analyzer-timeout, calculation happening at runtime
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
+         <xsd:element name="critical-analyzer-halt" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Should the server be shutdown or halted upon critical analysis failure.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="security-settings" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index ddff9af..bdfa291 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -382,6 +382,11 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(37, conf.getMaxDiskUsage());
       assertEquals(123, conf.getDiskScanPeriod());
 
+      assertEquals(333, conf.getCriticalAnalyzerCheckPeriod());
+      assertEquals(777, conf.getCriticalAnalyzerTimeout());
+      assertEquals(false, conf.isCriticalAnalyzer());
+      assertEquals(true, conf.isCriticalAnalyzerHalt());
+
       assertEquals(false, conf.isJournalDatasync());
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 6b5eb9a..ddf702e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -51,6 +51,8 @@ import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -763,7 +765,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
    }
 
-   public class FakeQueueForScheduleUnitTest implements Queue {
+   public class FakeQueueForScheduleUnitTest extends CriticalComponentImpl implements Queue {
 
       @Override
       public void setPurgeOnNoConsumers(boolean value) {
@@ -781,6 +783,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       public FakeQueueForScheduleUnitTest(final int expectedElements) {
+         super(EmptyCriticalAnalyzer.getInstance(), 1);
          this.expectedElements = new CountDownLatch(expectedElements);
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 6526454..89b7c74 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -57,6 +57,10 @@
       <global-max-size>1234567</global-max-size>
       <max-disk-usage>37</max-disk-usage>
       <disk-scan-period>123</disk-scan-period>
+      <critical-analyzer-halt>true</critical-analyzer-halt>
+      <critical-analyzer-check-period>333</critical-analyzer-check-period>
+      <critical-analyzer-timeout>777</critical-analyzer-timeout>
+      <critical-analyzer>false</critical-analyzer>
       <remoting-incoming-interceptors>
          <class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
          <class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/docs/user-manual/en/SUMMARY.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index f87793d..bc9ac31 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -23,6 +23,7 @@
 * [Detecting Dead Connections](connection-ttl.md)
 * [Detecting Slow Consumers](slow-consumers.md)
 * [Avoiding Network Isolation](network-isolation.md)
+* [Detecting Broker Issues (Critical Analysis)](critical-analysis.md)
 * [Resource Manager Configuration](transaction-config.md)
 * [Flow Control](flow-control.md)
 * [Guarantees of sends and commits](send-guarantees.md)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/docs/user-manual/en/configuration-index.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 4c16fa9..c643199 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -118,6 +118,11 @@ system-property-prefix | Prefix for replacing configuration settings using Bean
 [network-check-list](network-isolation.md) | The list of pings to be used on ping or InetAddress.isReacheable
 [network-check-ping-command](network-isolation.md) | The command used to oping IPV4 addresses
 [network-check-ping6-command](network-isolation.md) | The command used to oping IPV6 addresses
+[critical-analyzer](critical-analysis.md) | Enable or disable the critical analysis (default true)
+[critical-analyzer-timeout](critical-analysis.md) | Timeout used to do the critical analysis (default 120000 milliseconds)
+[critical-analyzer-check-period](critical-analysis.md) | Time used to check the response times (default half of critical-analyzer-timeout)
+[critical-analyzer-halt](critical-analysis.md) | Should the VM be halted upon failures (default false)
+
 
 #address-setting type
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/docs/user-manual/en/critical-analysis.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/critical-analysis.md b/docs/user-manual/en/critical-analysis.md
new file mode 100644
index 0000000..e867283
--- /dev/null
+++ b/docs/user-manual/en/critical-analysis.md
@@ -0,0 +1,85 @@
+# Critical Analysis of the broker
+
+There are a few things that can go wrong on a production environment:
+
+- Bugs, for more than we try they still happen! We always try to correct them, but that's the only constant in software development.
+- IO Errors, disks and hardware can go bad
+- Memory issues, the CPU can go crazy by another process
+
+For cases like this, we added a protection to the broker to shut itself down when bad things happen.
+
+This is a feature I hope you won't need it, think it as a safeguard:
+
+We measure time response in places like:
+
+- Queue delivery (add to the queue)
+- Journal storage
+- Paging operations
+
+If the response time goes beyond a configured timeout, the broker is considered unstable and an action will be taken to either shutdown the broker or halt the VM.
+
+You can use these following configuration options on broker.xml to configure how the critical analysis is performed.
+
+
+Name | Description
+:--- | :---
+critical-analyzer | Enable or disable the critical analysis (default true)
+critical-analyzer-timeout | Timeout used to do the critical analysis (default 120000 milliseconds)
+critical-analyzer-check-period | Time used to check the response times (default half of critical-analyzer-timeout)
+critical-analyzer-halt | Should the VM be halted upon failures (default false)
+
+The default for critical-analyzer-halt is false, however the generated broker.xml will have it set to true. That is because we cannot halt the VM if you are embedding ActiveMQ Artemis into an application server or on a multi tenant environment.
+
+The broker on the distribution will then have it set to true, but if you use it in any other way the default will be false.
+
+## What would you expect
+
+- You will see some logs
+
+If you have critical-analyzer-halt=true
+
+```
+[Artemis Critical Analyzer] 18:10:00,831 ERROR [org.apache.activemq.artemis.core.server] AMQ224079: The process for the virtual machine will be killed, as component org.apache.activemq.artemis.tests.integration.critical.CriticalSimpleTest$2@5af97850 is not responsive
+```
+
+Or if you have critical-analyzer-halt=false
+
+```
+[Artemis Critical Analyzer] 18:07:53,475 ERROR [org.apache.activemq.artemis.core.server] AMQ224080: The server process will now be stopped, as component org.apache.activemq.artemis.tests.integration.critical.CriticalSimpleTest$2@5af97850 is not responsive
+```
+
+You will see a simple thread dump of the server
+
+```
+[Artemis Critical Analyzer] 18:10:00,836 WARN  [org.apache.activemq.artemis.core.server] AMQ222199: Thread dump: AMQ119001: Generating thread dump
+*******************************************************************************
+===============================================================================
+AMQ119002: Thread Thread[Thread-1 (ActiveMQ-scheduled-threads),5,main] name = Thread-1 (ActiveMQ-scheduled-threads) id = 19 group = java.lang.ThreadGroup[name=main,maxpri=10]
+
+sun.misc.Unsafe.park(Native Method)
+java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
+java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
+java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
+java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
+java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
+java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
+java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
+java.lang.Thread.run(Thread.java:745)
+===============================================================================
+
+
+..... blablablablaba ..........
+
+
+===============================================================================
+AMQ119003: End Thread dump
+*******************************************************************************
+
+```
+
+- The Server will be halted if configured to halt
+
+- The system will be stopped if no halt is used:
+* Notice that if the system is not behaving well, there is no guarantees the stop will work.
+
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 0de19a1..afe1403 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -239,7 +239,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
                              final Executor executor, final ActiveMQServer server) {
             super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, deliveryMode,
                   maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager,
-                  addressSettingsRepository, executor, server);
+                  addressSettingsRepository, executor, server, null);
          }
 
          @Override
@@ -375,7 +375,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
       // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
       LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
                                                            new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
-                                                                         false, null, null, null, null, null, null),
+                                                                         false, null, null, null, null, null, null, null),
                                                            server.getNodeID());
       server.getStorageManager().addQueueBinding(txID, newBinding);
       server.getStorageManager().commitBindings(txID);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index f926af1..66381fa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -520,7 +520,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
                         HierarchicalRepository<AddressSettings> addressSettingsRepository,
                         Executor executor) {
             super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor,
-                  postOffice, storageManager, addressSettingsRepository, executor, null);
+                  postOffice, storageManager, addressSettingsRepository, executor, null, null);
          }
 
          @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java
new file mode 100644
index 0000000..da25243
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.critical;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalComponent;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CriticalSimpleTest extends ActiveMQTestBase {
+   @Test
+   public void testSimpleShutdown() throws Exception {
+
+      Configuration configuration = createDefaultConfig(false);
+      configuration.setCriticalAnalyzerCheckPeriod(10).setCriticalAnalyzerHalt(true);
+      ActiveMQServer server = createServer(false, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
+      server.start();
+
+      try {
+
+         CountDownLatch latch = new CountDownLatch(1);
+
+         server.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
+            @Override
+            public void criticalFailure(CriticalComponent components) throws ActiveMQException {
+               latch.countDown();
+            }
+         });
+
+         server.getCriticalAnalyzer().add(new CriticalComponent() {
+            @Override
+            public boolean isExpired(long timeout) {
+               return true;
+            }
+         });
+
+         Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+         Wait.waitFor(() -> !server.isStarted());
+
+
+         Assert.assertFalse(server.isStarted());
+
+      } finally {
+         server.stop();
+      }
+
+
+   }
+
+   @Test
+   public void testCriticalOff() throws Exception {
+
+      Configuration configuration = createDefaultConfig(false);
+      configuration.setCriticalAnalyzerCheckPeriod(10).setCriticalAnalyzer(false);
+      ActiveMQServer server = createServer(false, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
+      server.start();
+
+      try {
+         server.getCriticalAnalyzer().add(new CriticalComponent() {
+            @Override
+            public boolean isExpired(long timeout) {
+               return true;
+            }
+         });
+
+         Wait.waitFor(() -> !server.isStarted(), 500, 10);
+
+
+         Assert.assertTrue(server.isStarted());
+
+      } finally {
+         server.stop();
+      }
+
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
index 532207e..bae9115 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java
@@ -87,7 +87,7 @@ public class TopicCleanupTest extends JMSTestBase {
                                               FilterImpl.createFilter(Filter.GENERIC_IGNORED_FILTER), null,
                                               true, false, false, server.getScheduledPool(), server.getPostOffice(),
                                               storage, server.getAddressSettingsRepository(),
-                                              server.getExecutorFactory().getExecutor(), server);
+                                              server.getExecutorFactory().getExecutor(), server, null);
 
             LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index c830459..b97a861 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -54,6 +54,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.jboss.logging.Logger;
 import org.junit.After;
 import org.junit.Assert;
@@ -1724,7 +1725,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
 
       final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
 
-      final JournalStorageManager storage = new JournalStorageManager(config, factory, iofactory);
+      final JournalStorageManager storage = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), factory, iofactory);
 
       storage.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index 615a924..4819c5d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
 import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
 import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runners.Parameterized;
@@ -90,7 +91,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
 
    @Override
    protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
-      return new JournalStorageManager(configuration, execFactory, execFactory) {
+      return new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
          @Override
          public void deleteMessage(final long messageID) throws Exception {
             deletedMessage.add(messageID);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java
index 749715b..f25e950 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/JournalFileSizeTest.java
@@ -1,4 +1,5 @@
-/** * Licensed to the Apache Software Foundation (ASF) under one or more
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -15,13 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.persistence;
 
-
 import java.io.File;
 
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -43,9 +44,7 @@ public class JournalFileSizeTest {
       ConfigurationImpl config = new ConfigurationImpl();
       int origFileSize = config.getJournalFileSize();
       config.setJournalFileSize(origFileSize + (align / 2 - 1));
-      JournalStorageManager manager = new JournalStorageManager(config,
-            new OrderedExecutorFactory(null),
-            new OrderedExecutorFactory(null));
+      JournalStorageManager manager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), new OrderedExecutorFactory(null), new OrderedExecutorFactory(null));
       int fileSize = manager.getMessageJournal().getFileSize();
       Assert.assertEquals(origFileSize, fileSize);
    }
@@ -55,9 +54,7 @@ public class JournalFileSizeTest {
       ConfigurationImpl config = new ConfigurationImpl();
       int origFileSize = config.getJournalFileSize();
       config.setJournalFileSize(origFileSize + (align / 2 + 1));
-      JournalStorageManager manager = new JournalStorageManager(config,
-            new OrderedExecutorFactory(null),
-            new OrderedExecutorFactory(null));
+      JournalStorageManager manager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), new OrderedExecutorFactory(null), new OrderedExecutorFactory(null));
       int fileSize = manager.getMessageJournal().getFileSize();
       Assert.assertEquals(origFileSize + align, fileSize);
    }
@@ -67,9 +64,7 @@ public class JournalFileSizeTest {
       ConfigurationImpl config = new ConfigurationImpl();
       int origFileSize = config.getJournalFileSize();
       config.setJournalFileSize(origFileSize + (align / 2));
-      JournalStorageManager manager = new JournalStorageManager(config,
-            new OrderedExecutorFactory(null),
-            new OrderedExecutorFactory(null));
+      JournalStorageManager manager = new JournalStorageManager(config,EmptyCriticalAnalyzer.getInstance(), new OrderedExecutorFactory(null), new OrderedExecutorFactory(null));
       int fileSize = manager.getMessageJournal().getFileSize();
       Assert.assertEquals(origFileSize + align, fileSize);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
index 2ee879f..48bdc9a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakeJournal
 import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -66,7 +67,7 @@ public class RestartSMTest extends ActiveMQTestBase {
 
       PostOffice postOffice = new FakePostOffice();
 
-      final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), execFactory, execFactory);
+      final JournalStorageManager journal = new JournalStorageManager(createDefaultInVMConfig(), EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory);
 
       try {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
index 508f23b..6479d1e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOff
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.runner.RunWith;
@@ -138,7 +139,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
     * @param configuration
     */
    protected JournalStorageManager createJournalStorageManager(Configuration configuration) {
-      JournalStorageManager jsm = new JournalStorageManager(configuration, execFactory, execFactory);
+      JournalStorageManager jsm = new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory);
       addActiveMQComponent(jsm);
       return jsm;
    }
@@ -147,7 +148,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
     * @param configuration
     */
    protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
-      JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, execFactory, scheduledExecutorService);
+      JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory, scheduledExecutorService);
       addActiveMQComponent(jsm);
       return jsm;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 3095ab9..8236702 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -87,6 +87,7 @@ import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -435,7 +436,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
     * @throws Exception
     */
    private JournalStorageManager getStorage() throws Exception {
-      return new JournalStorageManager(createDefaultInVMConfig(), factory, factory);
+      return new JournalStorageManager(createDefaultInVMConfig(), EmptyCriticalAnalyzer.getInstance(), factory, factory);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
index ed1a4e5..b5dc463 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
@@ -70,7 +70,7 @@ public class QueueImplTest extends ActiveMQTestBase {
       QueueImpl queue =
                new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true,
                              false, scheduledExecutor, null, null, null,
-                             Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
+                             Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
 
       // Send one scheduled
 
@@ -135,7 +135,7 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    @Test
    public void testScheduled() throws Exception {
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
+      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
 
       FakeConsumer consumer = null;
 
@@ -233,7 +233,7 @@ public class QueueImplTest extends ActiveMQTestBase {
          public void disconnect() {
          }
       };
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
+      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null, null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.addConsumer(consumer);
       messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 21bc48d..985e83f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -42,6 +42,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -93,7 +94,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
 
-         journal = new JournalStorageManager(configuration, factory, factory);
+         journal = new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), factory, factory);
 
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
@@ -113,7 +114,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          journal.stop();
 
-         journal = new JournalStorageManager(configuration, factory, factory);
+         journal = new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), factory, factory);
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
 
@@ -136,7 +137,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
 
          mapDups.clear();
 
-         journal = new JournalStorageManager(configuration, factory, factory);
+         journal = new JournalStorageManager(configuration, EmptyCriticalAnalyzer.getInstance(), factory, factory);
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index efb9e77..54cae7b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -35,8 +35,10 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 
-public class FakeQueue implements Queue {
+public class FakeQueue extends CriticalComponentImpl implements Queue {
 
    @Override
    public void setPurgeOnNoConsumers(boolean value) {
@@ -175,6 +177,7 @@ public class FakeQueue implements Queue {
    }
 
    public FakeQueue(final SimpleString name, final long id) {
+      super(EmptyCriticalAnalyzer.getInstance(), 1);
       this.name = name;
       this.id = id;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index b64ff03..40f9214 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1310,6 +1310,6 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) {
       return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor,
-                           new FakePostOffice(), null, null, executor, null);
+                           new FakePostOffice(), null, null, executor, null, null);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
index 40c117a..4721579 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
@@ -42,7 +42,7 @@ public final class FakeQueueFactory implements QueueFactory {
    public Queue createQueueWith(final QueueConfig config) {
       return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(),
                            config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
-                           scheduledExecutor, postOffice, null, null, executor, null);
+                           scheduledExecutor, postOffice, null, null, executor, null, this);
    }
 
    @Deprecated
@@ -57,7 +57,7 @@ public final class FakeQueueFactory implements QueueFactory {
                             final boolean temporary,
                             final boolean autoCreated) {
       return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated,
-                           scheduledExecutor, postOffice, null, null, executor, null);
+                           scheduledExecutor, postOffice, null, null, executor, null, this);
    }
 
    @Override


[3/3] activemq-artemis git commit: This closes #1443

Posted by cl...@apache.org.
This closes #1443


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/01b37de7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/01b37de7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/01b37de7

Branch: refs/heads/master
Commit: 01b37de76187e68c80a6510678b0a9affb3b46a8
Parents: 8f33d27 f16af75
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 7 18:46:43 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 7 18:46:43 2017 -0400

----------------------------------------------------------------------
 .../cli/commands/tools/xml/XmlDataExporter.java |   3 +-
 .../artemis/cli/commands/etc/broker.xml         |   9 +
 .../apache/activemq/cli/test/ArtemisTest.java   |   5 +-
 .../artemis/utils/critical/CriticalAction.java  |  22 ++
 .../utils/critical/CriticalAnalyzer.java        |  43 +++
 .../utils/critical/CriticalAnalyzerImpl.java    | 185 ++++++++++++
 .../utils/critical/CriticalComponent.java       |  54 ++++
 .../utils/critical/CriticalComponentImpl.java   |  68 +++++
 .../artemis/utils/critical/CriticalMeasure.java |  52 ++++
 .../utils/critical/EmptyCriticalAnalyzer.java   |  90 ++++++
 .../utils/critical/CriticalAnalyzerTest.java    | 114 +++++++
 .../config/ActiveMQDefaultConfiguration.java    |  26 ++
 .../artemis/core/config/Configuration.java      |  17 ++
 .../core/config/impl/ConfigurationImpl.java     |  55 ++++
 .../deployers/impl/FileConfigurationParser.java |   8 +
 .../journal/AbstractJournalStorageManager.java  |  15 +-
 .../impl/journal/JDBCJournalStorageManager.java |   7 +-
 .../impl/journal/JournalStorageManager.java     |  14 +-
 .../artemis/core/server/ActiveMQServer.java     |   3 +
 .../core/server/ActiveMQServerLogger.java       |   8 +
 .../activemq/artemis/core/server/Queue.java     |   3 +-
 .../artemis/core/server/QueueFactory.java       |   4 +
 .../core/server/impl/ActiveMQServerImpl.java    |  94 +++++-
 .../core/server/impl/LastValueQueue.java        |   6 +-
 .../core/server/impl/QueueFactoryImpl.java      |  17 +-
 .../artemis/core/server/impl/QueueImpl.java     | 295 +++++++++++--------
 .../server/plugin/ActiveMQServerPlugin.java     |  11 +-
 .../resources/schema/artemis-configuration.xsd  |  32 ++
 .../core/config/impl/FileConfigurationTest.java |   5 +
 .../impl/ScheduledDeliveryHandlerTest.java      |   5 +-
 .../resources/ConfigurationTest-full-config.xml |   4 +
 docs/user-manual/en/SUMMARY.md                  |   1 +
 docs/user-manual/en/configuration-index.md      |   5 +
 docs/user-manual/en/critical-analysis.md        |  85 ++++++
 .../integration/client/HangConsumerTest.java    |   4 +-
 .../client/InterruptedLargeMessageTest.java     |   2 +-
 .../critical/CriticalSimpleTest.java            | 102 +++++++
 .../jms/client/TopicCleanupTest.java            |   2 +-
 .../journal/NIOJournalCompactTest.java          |   3 +-
 .../DeleteMessagesOnStartupTest.java            |   3 +-
 .../persistence/JournalFileSizeTest.java        |  17 +-
 .../integration/persistence/RestartSMTest.java  |   3 +-
 .../persistence/StorageManagerTestBase.java     |   5 +-
 .../replication/ReplicationTest.java            |   3 +-
 .../timing/core/server/impl/QueueImplTest.java  |   6 +-
 .../impl/DuplicateDetectionUnitTest.java        |   7 +-
 .../unit/core/postoffice/impl/FakeQueue.java    |   5 +-
 .../unit/core/server/impl/QueueImplTest.java    |   2 +-
 .../server/impl/fakes/FakeQueueFactory.java     |   4 +-
 49 files changed, 1349 insertions(+), 184 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-1324 Deadlock detection and health check of critical components

Posted by cl...@apache.org.
ARTEMIS-1324 Deadlock detection and health check of critical components


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f16af753
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f16af753
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f16af753

Branch: refs/heads/master
Commit: f16af7535417e3ba3ad625d22430edda7b76b5d2
Parents: 8f33d27
Author: Clebert Suconic <cl...@apache.org>
Authored: Sat Aug 5 00:52:42 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 7 18:40:03 2017 -0400

----------------------------------------------------------------------
 .../cli/commands/tools/xml/XmlDataExporter.java |   3 +-
 .../artemis/cli/commands/etc/broker.xml         |   9 +
 .../apache/activemq/cli/test/ArtemisTest.java   |   5 +-
 .../artemis/utils/critical/CriticalAction.java  |  22 ++
 .../utils/critical/CriticalAnalyzer.java        |  43 +++
 .../utils/critical/CriticalAnalyzerImpl.java    | 185 ++++++++++++
 .../utils/critical/CriticalComponent.java       |  54 ++++
 .../utils/critical/CriticalComponentImpl.java   |  68 +++++
 .../artemis/utils/critical/CriticalMeasure.java |  52 ++++
 .../utils/critical/EmptyCriticalAnalyzer.java   |  90 ++++++
 .../utils/critical/CriticalAnalyzerTest.java    | 114 +++++++
 .../config/ActiveMQDefaultConfiguration.java    |  26 ++
 .../artemis/core/config/Configuration.java      |  17 ++
 .../core/config/impl/ConfigurationImpl.java     |  55 ++++
 .../deployers/impl/FileConfigurationParser.java |   8 +
 .../journal/AbstractJournalStorageManager.java  |  15 +-
 .../impl/journal/JDBCJournalStorageManager.java |   7 +-
 .../impl/journal/JournalStorageManager.java     |  14 +-
 .../artemis/core/server/ActiveMQServer.java     |   3 +
 .../core/server/ActiveMQServerLogger.java       |   8 +
 .../activemq/artemis/core/server/Queue.java     |   3 +-
 .../artemis/core/server/QueueFactory.java       |   4 +
 .../core/server/impl/ActiveMQServerImpl.java    |  94 +++++-
 .../core/server/impl/LastValueQueue.java        |   6 +-
 .../core/server/impl/QueueFactoryImpl.java      |  17 +-
 .../artemis/core/server/impl/QueueImpl.java     | 295 +++++++++++--------
 .../server/plugin/ActiveMQServerPlugin.java     |  11 +-
 .../resources/schema/artemis-configuration.xsd  |  32 ++
 .../core/config/impl/FileConfigurationTest.java |   5 +
 .../impl/ScheduledDeliveryHandlerTest.java      |   5 +-
 .../resources/ConfigurationTest-full-config.xml |   4 +
 docs/user-manual/en/SUMMARY.md                  |   1 +
 docs/user-manual/en/configuration-index.md      |   5 +
 docs/user-manual/en/critical-analysis.md        |  85 ++++++
 .../integration/client/HangConsumerTest.java    |   4 +-
 .../client/InterruptedLargeMessageTest.java     |   2 +-
 .../critical/CriticalSimpleTest.java            | 102 +++++++
 .../jms/client/TopicCleanupTest.java            |   2 +-
 .../journal/NIOJournalCompactTest.java          |   3 +-
 .../DeleteMessagesOnStartupTest.java            |   3 +-
 .../persistence/JournalFileSizeTest.java        |  17 +-
 .../integration/persistence/RestartSMTest.java  |   3 +-
 .../persistence/StorageManagerTestBase.java     |   5 +-
 .../replication/ReplicationTest.java            |   3 +-
 .../timing/core/server/impl/QueueImplTest.java  |   6 +-
 .../impl/DuplicateDetectionUnitTest.java        |   7 +-
 .../unit/core/postoffice/impl/FakeQueue.java    |   5 +-
 .../unit/core/server/impl/QueueImplTest.java    |   2 +-
 .../server/impl/fakes/FakeQueueFactory.java     |   4 +-
 49 files changed, 1349 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
index f54c3d4..d37c965 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
@@ -83,6 +83,7 @@ import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectReposito
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 
 @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
 public final class XmlDataExporter extends OptionalLocking {
@@ -134,7 +135,7 @@ public final class XmlDataExporter extends OptionalLocking {
       final ExecutorService executor = Executors.newFixedThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory());
       ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
 
-      storageManager = new JournalStorageManager(config, executorFactory, executorFactory);
+      storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
 
       XMLOutputFactory factory = XMLOutputFactory.newInstance();
       XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index 603938a..adabf85 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -59,6 +59,15 @@ ${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
            that won't support flow control. -->
       <max-disk-usage>90</max-disk-usage>
 
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-halt>true</critical-analyzer-halt>
+
 ${global-max-section}
       <acceptors>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index 803020c..8b4098a 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -541,10 +541,11 @@ public class ArtemisTest extends CliTestBase {
       Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--queues", queues, "--addresses", addresses, "--no-autotune", "--require-login");
       System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
 
-      // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol
-      Artemis.internalExecute("run");
 
       try {
+         // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol
+         Artemis.internalExecute("run");
+
          try (ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
               ClientSessionFactory factory = locator.createSessionFactory();
               ClientSession coreSession = factory.createSession("admin", "admin", false, true, true, false, 0)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAction.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAction.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAction.java
new file mode 100644
index 0000000..bd16d46
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAction.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.critical;
+
+public interface CriticalAction {
+
+   void run(CriticalComponent failedComponent);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
new file mode 100644
index 0000000..6b5a436
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.critical;
+
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+
+public interface CriticalAnalyzer extends ActiveMQComponent {
+
+   default void clear() {
+   }
+
+   boolean isMeasuring();
+
+   void add(CriticalComponent component);
+
+   void remove(CriticalComponent component);
+
+   CriticalAnalyzer setCheckTime(long timeout);
+
+   long getCheckTime();
+
+   CriticalAnalyzer setTimeout(long timeout);
+
+   long getTimeout();
+
+   CriticalAnalyzer addAction(CriticalAction action);
+
+   void check();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
new file mode 100644
index 0000000..c583f2a
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.critical;
+
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.jboss.logging.Logger;
+
+public class CriticalAnalyzerImpl implements CriticalAnalyzer {
+
+   private final Logger logger = Logger.getLogger(CriticalAnalyzer.class);
+
+   private volatile long timeout;
+
+   private volatile long checkTime;
+
+   @Override
+   public void clear() {
+      actions.clear();
+      components.clear();
+   }
+
+   private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>();
+
+   private Thread thread;
+
+   private final Semaphore running = new Semaphore(1);
+
+   private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
+
+   @Override
+   public boolean isMeasuring() {
+      return true;
+   }
+
+   @Override
+   public void add(CriticalComponent component) {
+      components.add(component);
+   }
+
+   @Override
+   public void remove(CriticalComponent component) {
+      components.remove(component);
+   }
+
+   @Override
+   public CriticalAnalyzer setCheckTime(long timeout) {
+      this.checkTime = timeout;
+      return this;
+   }
+
+   @Override
+   public long getCheckTime() {
+      if (checkTime == 0) {
+         checkTime = getTimeout() / 2;
+      }
+      return checkTime;
+   }
+
+   @Override
+   public CriticalAnalyzer setTimeout(long timeout) {
+      this.timeout = timeout;
+      return this;
+   }
+
+   @Override
+   public long getTimeout() {
+      if (timeout == 0) {
+         timeout = TimeUnit.MINUTES.toMillis(2);
+      }
+      return timeout;
+   }
+
+   @Override
+   public CriticalAnalyzer addAction(CriticalAction action) {
+      this.actions.add(action);
+      return this;
+   }
+
+   @Override
+   public void check() {
+      boolean retry = true;
+      while (retry) {
+         try {
+            for (CriticalComponent component : components) {
+
+               if (component.isExpired(timeout)) {
+                  fireAction(component);
+                  // no need to keep running if there's already a component failed
+                  return;
+               }
+            }
+            retry = false; // got to the end of the list, no need to retry
+         } catch (ConcurrentModificationException dontCare) {
+            // lets retry on the loop
+         }
+      }
+   }
+
+   private void fireAction(CriticalComponent component) {
+      for (CriticalAction action: actions) {
+         try {
+            action.run(component);
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+   }
+
+   @Override
+   public void start() {
+
+      if (!running.tryAcquire()) {
+         // already running
+         return;
+      }
+
+      // we are not using any Thread Pool or any Scheduled Executors from the ArtemisServer
+      // as that would defeat the purpose,
+      // as in any deadlocks the schedulers may be starving for something not responding fast enough
+      thread = new Thread("Artemis Critical Analyzer") {
+         @Override
+         public void run() {
+            try {
+               while (true) {
+                  if (running.tryAcquire(getCheckTime(), TimeUnit.MILLISECONDS)) {
+                     running.release();
+                     // this means that the server has been stopped as we could acquire the semaphore... returning now
+                     break;
+                  }
+                  check();
+               }
+            } catch (InterruptedException interrupted) {
+               // i will just leave on that case
+            }
+         }
+      };
+
+      thread.setDaemon(true);
+
+      thread.start();
+   }
+
+   @Override
+   public void stop() {
+      if (!isStarted()) {
+         // already stopped, leaving
+         return;
+      }
+
+      running.release();
+
+      try {
+         if (thread != null) {
+            thread.join();
+         }
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public boolean isStarted() {
+      return running.availablePermits() == 0;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
new file mode 100644
index 0000000..a2459dd
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.critical;
+
+
+/**
+ * A Critical component enters and leaves a critical state.
+ * You update a long every time you enter a critical path
+ * you update a different long with a System.currentMillis every time you leave that path.
+ *
+ * If the enterCritical > leaveCritical at any point, then you need to measure the timeout.
+ * if the system stops responding, then you have something irresponsive at the system.
+ */
+public interface CriticalComponent {
+
+   /**
+    * please save the time you entered here.
+    * Use volatile variables.
+    * No locks through anywhere.
+    */
+   default void enterCritical(int path) {
+      // I'm providing a default as some components may chose to calculate it themselves
+   }
+
+   /**
+    * please save the time you entered here
+    * Use volatile variables.
+    * No locks through anywhere.
+    */
+   default void leaveCritical(int path) {
+      // I'm providing a default as some components may chose to calculate it themselves
+   }
+
+   /**
+    * Is this Component expired at a given timeout.. on any of its paths.
+    * @param timeout
+    * @return -1 if it's ok, or the number of the path it failed
+    */
+   boolean isExpired(long timeout);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
new file mode 100644
index 0000000..c1c5602
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.critical;
+
+/**
+ * This is not abstract as it could be used through aggregations or extensions.
+ * This is only good for cases where you call leave within the same thread as you called enter.
+ */
+public class CriticalComponentImpl implements CriticalComponent {
+
+   private final CriticalMeasure[] measures;
+   private final CriticalAnalyzer analyzer;
+
+   public CriticalComponentImpl(CriticalAnalyzer analyzer, int numberOfPaths) {
+      if (analyzer == null) {
+         analyzer = EmptyCriticalAnalyzer.getInstance();
+      }
+      this.analyzer = analyzer;
+
+      if (analyzer.isMeasuring()) {
+         measures = new CriticalMeasure[numberOfPaths];
+         for (int i = 0; i < numberOfPaths; i++) {
+            measures[i] = new CriticalMeasure();
+         }
+      } else {
+         measures = null;
+      }
+   }
+
+   @Override
+   public void enterCritical(int path) {
+      if (analyzer.isMeasuring()) {
+         measures[path].enterCritical();
+      }
+
+   }
+
+   @Override
+   public void leaveCritical(int path) {
+      if (analyzer.isMeasuring()) {
+         measures[path].leaveCritical();
+      }
+   }
+
+   @Override
+   public boolean isExpired(long timeout) {
+      for (int i = 0; i < measures.length; i++) {
+         if (measures[i].isExpired(timeout)) {
+            return true;
+         }
+      }
+      return false;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
new file mode 100644
index 0000000..b853dc5
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.critical;
+
+import org.jboss.logging.Logger;
+
+public class CriticalMeasure {
+
+   private static final Logger logger = Logger.getLogger(CriticalMeasure.class);
+
+   private volatile long timeEnter;
+   private volatile long timeLeft;
+
+   public void enterCritical() {
+      timeEnter = System.currentTimeMillis();
+   }
+
+   public void leaveCritical() {
+      timeLeft = System.currentTimeMillis();
+   }
+
+   public boolean isExpired(long timeout) {
+      if (timeEnter > timeLeft) {
+         return System.currentTimeMillis() - timeEnter > timeout;
+      }
+
+      return false;
+   }
+
+   public long enterTime() {
+      return timeEnter;
+   }
+
+   public long leaveTime() {
+      return timeLeft;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
new file mode 100644
index 0000000..4cf23a9
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.critical;
+
+public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
+
+   private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer();
+
+   public static EmptyCriticalAnalyzer getInstance() {
+      return instance;
+   }
+
+   private EmptyCriticalAnalyzer() {
+   }
+
+   @Override
+   public void add(CriticalComponent component) {
+
+   }
+
+   @Override
+   public void remove(CriticalComponent component) {
+
+   }
+
+   @Override
+   public boolean isMeasuring() {
+      return false;
+   }
+
+   @Override
+   public void start() throws Exception {
+
+   }
+
+   @Override
+   public void stop() throws Exception {
+
+   }
+
+   @Override
+   public boolean isStarted() {
+      return false;
+   }
+
+   @Override
+   public CriticalAnalyzer setCheckTime(long timeout) {
+      return this;
+   }
+
+   @Override
+   public long getCheckTime() {
+      return 0;
+   }
+
+   @Override
+   public CriticalAnalyzer setTimeout(long timeout) {
+      return this;
+   }
+
+   @Override
+   public long getTimeout() {
+      return 0;
+   }
+
+   @Override
+   public CriticalAnalyzer addAction(CriticalAction action) {
+      return this;
+   }
+
+   @Override
+   public void check() {
+
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
new file mode 100644
index 0000000..638eb61
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.critical;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CriticalAnalyzerTest {
+
+   @Rule
+   public ThreadLeakCheckRule rule = new ThreadLeakCheckRule();
+
+   private CriticalAnalyzer analyzer;
+
+   @After
+   public void tearDown() throws Exception {
+      if (analyzer != null) {
+         analyzer.stop();
+      }
+   }
+
+   @Test
+   public void testAction() throws Exception {
+      analyzer = new CriticalAnalyzerImpl().setTimeout(100).setCheckTime(50);
+      analyzer.add(new CriticalComponent() {
+         @Override
+         public boolean isExpired(long timeout) {
+            return true;
+         }
+      });
+
+      CountDownLatch latch = new CountDownLatch(1);
+
+      analyzer.start();
+
+      analyzer.addAction((CriticalComponent comp) -> {
+         System.out.println("component " + comp + " received");
+         latch.countDown();
+      });
+
+      Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      analyzer.stop();
+   }
+
+   @Test
+   public void testActionOnImpl() throws Exception {
+      analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
+      CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
+      analyzer.add(component);
+
+      component.enterCritical(0);
+      component.leaveCritical(0);
+      component.enterCritical(1);
+
+      CountDownLatch latch = new CountDownLatch(1);
+
+      analyzer.start();
+
+      analyzer.addAction((CriticalComponent comp) -> {
+         System.out.println("component " + comp + " received");
+         latch.countDown();
+      });
+
+      Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      analyzer.stop();
+   }
+
+   @Test
+   public void testNegative() throws Exception {
+      analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
+      CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
+      analyzer.add(component);
+
+      component.enterCritical(0);
+      component.leaveCritical(0);
+
+      CountDownLatch latch = new CountDownLatch(1);
+
+      analyzer.start();
+
+      analyzer.addAction((CriticalComponent comp) -> {
+         System.out.println("component " + comp + " received");
+         latch.countDown();
+      });
+
+      Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
+
+      analyzer.stop();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 899dc2c..7e6684b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -477,6 +477,12 @@ public final class ActiveMQDefaultConfiguration {
 
    public static int DEFAULT_QUORUM_SIZE = -1;
 
+   public static final boolean DEFAULT_ANALYZE_CRITICAL = true;
+
+   public static final long DEFAULT_ANALYZE_CRITICAL_TIMEOUT = 120000;
+
+   public static final boolean DEFAULT_ANALYZE_CRITICAL_HALT = false;
+
    /**
     * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
     */
@@ -1282,4 +1288,24 @@ public final class ActiveMQDefaultConfiguration {
    public static int getDefaultQuorumSize() {
       return DEFAULT_QUORUM_SIZE;
    }
+
+
+   public static boolean getCriticalAnalyzer() {
+      return DEFAULT_ANALYZE_CRITICAL;
+   }
+
+   public static long getCriticalAnalyzerTimeout() {
+      return DEFAULT_ANALYZE_CRITICAL_TIMEOUT;
+   }
+
+   public static long getCriticalAnalyzerCheckPeriod(long timeout) {
+      // this will be 0, the implementation should return 1/2 of the configured critical timeout
+      return timeout / 2;
+   }
+
+   public static boolean getCriticalAnalyzerHalt() {
+      return DEFAULT_ANALYZE_CRITICAL_HALT;
+   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 6997443..e4c6c01 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -75,6 +75,23 @@ public interface Configuration {
 
    Configuration parseSystemProperties(Properties properties) throws Exception;
 
+   boolean isCriticalAnalyzer();
+
+   Configuration setCriticalAnalyzer(boolean CriticalAnalyzer);
+
+   long getCriticalAnalyzerTimeout();
+
+   Configuration setCriticalAnalyzerTimeout(long timeout);
+
+   long getCriticalAnalyzerCheckPeriod();
+
+   Configuration setCriticalAnalyzerCheckPeriod(long checkPeriod);
+
+   boolean isCriticalAnalyzerHalt();
+
+   Configuration setCriticalAnalyzerHalt(boolean halt);
+
+
    /**
     * Returns whether this server is clustered. <br>
     * {@code true} if {@link #getClusterConfigurations()} is not empty.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 9195cff..e675606 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -294,6 +294,14 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    private String internalNamingPrefix = ActiveMQDefaultConfiguration.getInternalNamingPrefix();
 
+   private boolean criticalAnalyzer = ActiveMQDefaultConfiguration.getCriticalAnalyzer();
+
+   private boolean criticalAnalyzerHalt = ActiveMQDefaultConfiguration.getCriticalAnalyzerHalt();
+
+   private long criticalAnalyzerTimeout = ActiveMQDefaultConfiguration.getCriticalAnalyzerTimeout();
+
+   private long criticalAnalyzerCheckPeriod = 0; // non set
+
    /**
     * Parent folder for all data folders.
     */
@@ -2064,6 +2072,53 @@ public class ConfigurationImpl implements Configuration, Serializable {
       return this;
    }
 
+   @Override
+   public boolean isCriticalAnalyzer() {
+      return criticalAnalyzer;
+   }
+
+   @Override
+   public Configuration setCriticalAnalyzer(boolean CriticalAnalyzer) {
+      this.criticalAnalyzer = CriticalAnalyzer;
+      return this;
+   }
+
+   @Override
+   public long getCriticalAnalyzerTimeout() {
+      return criticalAnalyzerTimeout;
+   }
+
+   @Override
+   public Configuration setCriticalAnalyzerTimeout(long timeout) {
+      this.criticalAnalyzerTimeout = timeout;
+      return this;
+   }
+
+   @Override
+   public long getCriticalAnalyzerCheckPeriod() {
+      if (criticalAnalyzerCheckPeriod <= 0) {
+         this.criticalAnalyzerCheckPeriod = ActiveMQDefaultConfiguration.getCriticalAnalyzerCheckPeriod(criticalAnalyzerTimeout);
+      }
+      return criticalAnalyzerCheckPeriod;
+   }
+
+   @Override
+   public Configuration setCriticalAnalyzerCheckPeriod(long checkPeriod) {
+      this.criticalAnalyzerCheckPeriod = checkPeriod;
+      return this;
+   }
+
+   @Override
+   public boolean isCriticalAnalyzerHalt() {
+      return criticalAnalyzerHalt;
+   }
+
+   @Override
+   public Configuration setCriticalAnalyzerHalt(boolean halt) {
+      this.criticalAnalyzerHalt = halt;
+      return this;
+   }
+
    public static boolean checkoutDupCacheSize(final int windowSize, final int idCacheSize) {
       final int msgNumInFlight = windowSize / DEFAULT_JMS_MESSAGE_SIZE;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index a4a3487..9e1e807 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -608,6 +608,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       config.setNetworkCheckPingCommand(getString(e, "network-check-ping-command", config.getNetworkCheckPingCommand(), Validators.NO_CHECK));
 
+      config.setCriticalAnalyzer(getBoolean(e, "critical-analyzer", config.isCriticalAnalyzer()));
+
+      config.setCriticalAnalyzerTimeout(getLong(e, "critical-analyzer-timeout", config.getCriticalAnalyzerTimeout(), Validators.GE_ZERO));
+
+      config.setCriticalAnalyzerCheckPeriod(getLong(e, "critical-analyzer-check-period", config.getCriticalAnalyzerCheckPeriod(), Validators.GE_ZERO));
+
+      config.setCriticalAnalyzerHalt(getBoolean(e, "critical-analyzer-halt", config.isCriticalAnalyzerHalt()));
+
       parseAddressSettings(e, config);
 
       parseResourceLimits(e, config);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index dc399c1..58c86a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -105,6 +105,8 @@ import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.jboss.logging.Logger;
 
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
@@ -121,7 +123,10 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
  * <p>
  * Using this class also ensures that locks are acquired in the right order, avoiding dead-locks.
  */
-public abstract class AbstractJournalStorageManager implements StorageManager {
+public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {
+
+   private static final int CRITICAL_PATHS = 1;
+   private static final int CRITICAL_STORE = 0;
 
    private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
 
@@ -188,17 +193,21 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
    protected final Set<Long> largeMessagesToDelete = new HashSet<>();
 
    public AbstractJournalStorageManager(final Configuration config,
+                                        final CriticalAnalyzer analyzer,
                                         final ExecutorFactory executorFactory,
                                         final ScheduledExecutorService scheduledExecutorService,
                                         final ExecutorFactory ioExecutors) {
-      this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
+      this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutors, null);
    }
 
    public AbstractJournalStorageManager(Configuration config,
+                                        CriticalAnalyzer analyzer,
                                         ExecutorFactory executorFactory,
                                         ScheduledExecutorService scheduledExecutorService,
                                         ExecutorFactory ioExecutors,
                                         IOCriticalErrorListener criticalErrorListener) {
+      super(analyzer, CRITICAL_PATHS);
+
       this.executorFactory = executorFactory;
 
       this.ioCriticalErrorListener = criticalErrorListener;
@@ -378,12 +387,14 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
    @Override
    public void readLock() {
+      enterCritical(CRITICAL_STORE);
       storageManagerLock.readLock().lock();
    }
 
    @Override
    public void readUnLock() {
       storageManagerLock.readLock().unlock();
+      leaveCritical(CRITICAL_STORE);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index 9923a3e..c819bb6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -32,24 +32,27 @@ import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
 import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 
 public class JDBCJournalStorageManager extends JournalStorageManager {
 
    private Connection connection;
 
    public JDBCJournalStorageManager(Configuration config,
+                                    CriticalAnalyzer analyzer,
                                     ExecutorFactory executorFactory,
                                     ExecutorFactory ioExecutorFactory,
                                     ScheduledExecutorService scheduledExecutorService) {
-      super(config, executorFactory, scheduledExecutorService, ioExecutorFactory);
+      super(config, analyzer, executorFactory, scheduledExecutorService, ioExecutorFactory);
    }
 
    public JDBCJournalStorageManager(final Configuration config,
+                                    final CriticalAnalyzer analyzer,
                                     final ScheduledExecutorService scheduledExecutorService,
                                     final ExecutorFactory executorFactory,
                                     final ExecutorFactory ioExecutorFactory,
                                     final IOCriticalErrorListener criticalErrorListener) {
-      super(config, executorFactory, scheduledExecutorService, ioExecutorFactory, criticalErrorListener);
+      super(config, analyzer, executorFactory, scheduledExecutorService, ioExecutorFactory, criticalErrorListener);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 148c1f0..2341a66 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.jboss.logging.Logger;
 
 public class JournalStorageManager extends AbstractJournalStorageManager {
@@ -85,29 +86,32 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
    private ReplicationManager replicator;
 
    public JournalStorageManager(final Configuration config,
+                                final CriticalAnalyzer analyzer,
                                 final ExecutorFactory executorFactory,
                                 final ScheduledExecutorService scheduledExecutorService,
                                 final ExecutorFactory ioExecutors) {
-      this(config, executorFactory, scheduledExecutorService, ioExecutors, null);
+      this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutors, null);
    }
 
-   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ExecutorFactory ioExecutors) {
-      this(config, executorFactory, null, ioExecutors, null);
+   public JournalStorageManager(final Configuration config, CriticalAnalyzer analyzer, final ExecutorFactory executorFactory, final ExecutorFactory ioExecutors) {
+      this(config, analyzer, executorFactory, null, ioExecutors, null);
    }
 
    public JournalStorageManager(final Configuration config,
+                                final CriticalAnalyzer analyzer,
                                 final ExecutorFactory executorFactory,
                                 final ScheduledExecutorService scheduledExecutorService,
                                 final ExecutorFactory ioExecutors,
                                 final IOCriticalErrorListener criticalErrorListener) {
-      super(config, executorFactory, scheduledExecutorService, ioExecutors, criticalErrorListener);
+      super(config, analyzer, executorFactory, scheduledExecutorService, ioExecutors, criticalErrorListener);
    }
 
    public JournalStorageManager(final Configuration config,
+                                final CriticalAnalyzer analyzer,
                                 final ExecutorFactory executorFactory,
                                 final ExecutorFactory ioExecutors,
                                 final IOCriticalErrorListener criticalErrorListener) {
-      super(config, executorFactory, null, ioExecutors, criticalErrorListener);
+      super(config, analyzer, executorFactory, null, ioExecutors, criticalErrorListener);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index a3b93ea..643d2be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -62,6 +62,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 
 /**
  * This interface defines the internal interface of the ActiveMQ Artemis Server exposed to other components
@@ -108,6 +109,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    NodeManager getNodeManager();
 
+   CriticalAnalyzer getCriticalAnalyzer();
+
    /**
     * @return
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index f930f02..006ad23 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1631,6 +1631,14 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 224075, value = "Cannot find pageTX id = {0}", format = Message.Format.MESSAGE_FORMAT)
    void journalCannotFindPageTX(Long id);
 
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224079, value = "The process for the virtual machine will be killed, as component {0} is not responsive", format = Message.Format.MESSAGE_FORMAT)
+   void criticalSystemHalt(Object component);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224080, value = "The server process will now be stopped, as component {0} is not responsive", format = Message.Format.MESSAGE_FORMAT)
+   void criticalSystemShutdown(Object component);
+
    @LogMessage(level = Logger.Level.INFO)
    @Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT)
    void undeployAddress(SimpleString addressName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 541b4d8..9a34837 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -31,8 +31,9 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.critical.CriticalComponent;
 
-public interface Queue extends Bindable {
+public interface Queue extends Bindable,CriticalComponent {
 
    int MAX_CONSUMERS_UNLIMITED = -1;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
index c2aeba4..7b377f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
@@ -51,4 +51,8 @@ public interface QueueFactory {
     * @param postOffice
     */
    void setPostOffice(PostOffice postOffice);
+
+   default void queueRemoved(Queue queue) {
+
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 12848bb..4856d8b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -172,6 +172,10 @@ import org.apache.activemq.artemis.utils.TimeUtils;
 import org.apache.activemq.artemis.utils.VersionLoader;
 import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerImpl;
+import org.apache.activemq.artemis.utils.critical.CriticalComponent;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.jboss.logging.Logger;
 
 /**
@@ -316,6 +320,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private final ActiveMQServer parentServer;
 
+
+   private final CriticalAnalyzer analyzer;
+
    //todo think about moving this to the activation
    private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>();
 
@@ -426,6 +433,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       this.parentServer = parentServer;
 
       this.serviceRegistry = serviceRegistry == null ? new ServiceRegistryImpl() : serviceRegistry;
+
+      if (configuration.isCriticalAnalyzer()) {
+         this.analyzer = new CriticalAnalyzerImpl();
+      } else {
+         this.analyzer = EmptyCriticalAnalyzer.getInstance();
+      }
    }
 
    @Override
@@ -479,12 +492,79 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
    }
 
+   @Override
+   public CriticalAnalyzer getCriticalAnalyzer() {
+      return this.analyzer;
+   }
+
    private void internalStart() throws Exception {
       if (state != SERVER_STATE.STOPPED) {
          logger.debug("Server already started!");
          return;
       }
 
+      /** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
+      this.analyzer.clear();
+
+      this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod()).setTimeout(configuration.getCriticalAnalyzerTimeout());
+
+      if (configuration.isCriticalAnalyzer()) {
+         this.getCriticalAnalyzer().start();
+      }
+
+      this.getCriticalAnalyzer().addAction((CriticalComponent c) -> {
+
+         if (configuration.isCriticalAnalyzerHalt()) {
+            ActiveMQServerLogger.LOGGER.criticalSystemHalt(c);
+         } else {
+            ActiveMQServerLogger.LOGGER.criticalSystemShutdown(c);
+         }
+
+         threadDump();
+
+         // on the case of a critical failure, -1 cannot simply means forever.
+         // in case graceful is -1, we will set it to 30 seconds
+         long timeout = configuration.getGracefulShutdownTimeout() < 0 ? 30000 : configuration.getGracefulShutdownTimeout();
+
+         Thread notificationSender = new Thread() {
+            @Override
+            public void run() {
+               try {
+                  callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.criticalFailure(c) : null);
+               } catch (Throwable e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+         };
+
+         // I'm using a different thread here as we need to manage timeouts
+         notificationSender.start();
+
+         try {
+            notificationSender.join(timeout);
+         } catch (InterruptedException ignored) {
+         }
+
+         if (configuration.isCriticalAnalyzerHalt()) {
+            Runtime.getRuntime().halt(70); // Linux systems will have /usr/include/sysexits.h showing 70 as internal software error
+         } else {
+            // you can't stop from the check thread,
+            // nor can use an executor
+            Thread stopThread = new Thread() {
+               @Override
+               public void run() {
+                  try {
+                     ActiveMQServerImpl.this.stop();
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+               }
+            };
+            stopThread.start();
+
+         }
+      });
+
       configuration.parseSystemProperties();
 
       startDate = new Date();
@@ -1061,6 +1141,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
+      try {
+         this.getCriticalAnalyzer().stop();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+
       if (identity != null) {
          ActiveMQServerLogger.LOGGER.serverStopped("identity=" + identity + ",version=" + getVersion().getFullVersion(), tempNodeID, getUptime());
       } else {
@@ -2015,10 +2101,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    private StorageManager createStorageManager() {
       if (configuration.isPersistenceEnabled()) {
          if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
-            return new JDBCJournalStorageManager(configuration, getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
+            JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
+            this.getCriticalAnalyzer().add(journal);
+            return journal;
          } else {
             // Default to File Based Storage Manager, (Legacy default configuration).
-            return new JournalStorageManager(configuration, executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);
+            JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);
+            this.getCriticalAnalyzer().add(journal);
+            return journal;
          }
       }
       return new NullStorageManager();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 8370839..192f25c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -65,8 +66,9 @@ public class LastValueQueue extends QueueImpl {
                          final StorageManager storageManager,
                          final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                          final Executor executor,
-                         final ActiveMQServer server) {
-      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server);
+                         final ActiveMQServer server,
+                         final QueueFactory factory) {
+      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f16af753/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 3d8ceb1..33e66d1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -75,10 +75,12 @@ public class QueueFactoryImpl implements QueueFactory {
       final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
       final Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
+         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       } else {
-         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
+         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       }
+
+      server.getCriticalAnalyzer().add(queue);
       return queue;
    }
 
@@ -101,11 +103,18 @@ public class QueueFactoryImpl implements QueueFactory {
 
       Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
+         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       } else {
-         queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server);
+         queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       }
 
+      server.getCriticalAnalyzer().add(queue);
+
       return queue;
    }
+
+   @Override
+   public void queueRemoved(Queue queue) {
+      server.getCriticalAnalyzer().remove(queue);
+   }
 }