You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/29 15:35:57 UTC

[3/3] activemq-artemis git commit: NO-JIRA Adding CriticalAnalyzer test on HALT

NO-JIRA Adding CriticalAnalyzer test on HALT


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/178d4031
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/178d4031
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/178d4031

Branch: refs/heads/master
Commit: 178d403117e97d4459d4888f2537fc713f12fe0a
Parents: a304151
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 28 17:07:33 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 29 11:35:47 2017 -0400

----------------------------------------------------------------------
 .../utils/critical/CriticalAnalyzerPolicy.java  |  17 +++
 .../core/server/impl/ActiveMQServerImpl.java    |   8 +-
 .../integration/critical/CriticalCrashTest.java | 148 +++++++++++++++++++
 3 files changed, 169 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/178d4031/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java
index 8a343e5..c4c32e9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java
@@ -17,6 +17,23 @@
 
 package org.apache.activemq.artemis.utils.critical;
 
+import org.apache.activemq.artemis.utils.uri.BeanSupport;
+import org.apache.commons.beanutils.Converter;
+
 public enum CriticalAnalyzerPolicy {
    HALT, SHUTDOWN, LOG;
+
+   static {
+      // for URI support on ClusterConnection
+      BeanSupport.registerConverter(new CriticalAnalyzerPolicyConverter(), CriticalAnalyzerPolicy.class);
+   }
+
+   static class CriticalAnalyzerPolicyConverter implements Converter {
+
+      @Override
+      public <T> T convert(Class<T> type, Object value) {
+         return type.cast(CriticalAnalyzerPolicy.valueOf(value.toString()));
+      }
+   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/178d4031/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 51607e9..eebf5a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -248,9 +248,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private volatile ExecutorService threadPool;
 
-   private volatile ScheduledExecutorService scheduledPool;
+   protected volatile ScheduledExecutorService scheduledPool;
 
-   private volatile ExecutorFactory executorFactory;
+   protected volatile ExecutorFactory executorFactory;
 
    private volatile ExecutorService ioExecutorPool;
 
@@ -258,7 +258,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
     * This is a thread pool for io tasks only.
     * We can't use the same global executor to avoid starvations.
     */
-   private volatile ExecutorFactory ioExecutorFactory;
+   protected volatile ExecutorFactory ioExecutorFactory;
 
    private final NetworkHealthCheck networkHealthCheck = new NetworkHealthCheck(ActiveMQDefaultConfiguration.getDefaultNetworkCheckNic(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckPeriod(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckTimeout());
 
@@ -318,7 +318,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private final Map<String, Object> activationParams = new HashMap<>();
 
-   private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
+   protected final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
 
    private final ActiveMQServer parentServer;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/178d4031/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
new file mode 100644
index 0000000..1a441f2
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.critical;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CriticalCrashTest extends ActiveMQTestBase {
+
+   @Test
+   public void testCrash() throws Exception {
+
+      // Passing these arguments should change the criticalAnalyzer parameters
+      Process process = SpawnedVMSupport.spawnVM(CriticalCrashTest.class.getName(), new String[]{"-Dbrokerconfig.criticalAnalyzer=true", "-Dbrokerconfig.criticalAnalyzerCheckPeriod=100", "-Dbrokerconfig.criticalAnalyzerTimeout=500", "-Dbrokerconfig.criticalAnalyzerPolicy=HALT"}, new String[]{});
+
+      Assert.assertEquals(70, process.waitFor());
+      deleteDirectory(new File("./target/server"));
+   }
+
+   public static void main(String[] arg) {
+      try {
+         CriticalCrashTest test = new CriticalCrashTest();
+         test.runSimple();
+      } catch (Exception e) {
+         e.printStackTrace();
+         System.exit(-1);
+      }
+   }
+
+   public void runSimple() throws Exception {
+      deleteDirectory(new File("./target/server"));
+      ActiveMQServer server = createServer("./target/server");
+
+      try {
+         server.start();
+
+         ConnectionFactory factory = new ActiveMQConnectionFactory();
+         Connection connection = factory.createConnection();
+
+         Session session = connection.createSession();
+
+         MessageProducer producer = session.createProducer(session.createQueue("queue"));
+
+         for (int i = 0; i < 500; i++) {
+            producer.send(session.createTextMessage("text"));
+         }
+
+         System.out.println("Sent messages");
+
+      } finally {
+         server.stop();
+
+      }
+
+   }
+
+   ActiveMQServer createServer(String folder) throws Exception {
+      final AtomicBoolean blocked = new AtomicBoolean(false);
+      Configuration conf = createConfig(folder);
+      ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
+
+      conf.setPersistenceEnabled(true);
+
+      ActiveMQServer server = new ActiveMQServerImpl(conf, securityManager) {
+
+         @Override
+         protected StorageManager createStorageManager() {
+
+            JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
+               @Override
+               public void readLock() {
+                  super.readLock();
+                  if (blocked.get()) {
+                     while (true) {
+                        try {
+                           System.out.println("Blocking forever");
+                           Thread.sleep(1000);
+                        } catch (Throwable ignored) {
+
+                        }
+                     }
+                  }
+               }
+
+               @Override
+               public void storeMessage(Message message) throws Exception {
+                  super.storeMessage(message);
+                  blocked.set(true);
+               }
+            };
+
+            this.getCriticalAnalyzer().add(storageManager);
+
+            return storageManager;
+         }
+
+      };
+
+      return server;
+   }
+
+   Configuration createConfig(String folder) throws Exception {
+
+      Configuration configuration = createDefaultConfig(true);
+      configuration.setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(folder + "/journal").setBindingsDirectory(folder + "/bindings").setPagingDirectory(folder + "/paging").
+         setLargeMessagesDirectory(folder + "/largemessage").setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false);
+      configuration.setSecurityEnabled(false);
+      configuration.setPersistenceEnabled(true);
+
+      return configuration;
+   }
+
+}