You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/06 14:08:04 UTC

[2/7] activemq-artemis git commit: ARTEMIS-581 Implement max disk usage, and global-max-size

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java
deleted file mode 100644
index f315b89..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.server;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class AddressFullLoggingTest extends ActiveMQTestBase {
-
-   @BeforeClass
-   public static void prepareLogger() {
-      AssertionLoggerHandler.startCapture();
-   }
-
-   @Test
-   /**
-    * When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
-    *
-    *   -Djava.util.logging.manager=org.jboss.logmanager.LogManager  -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
-    */ public void testBlockLogging() throws Exception {
-      final int MAX_MESSAGES = 200;
-      final String MY_ADDRESS = "myAddress";
-      final String MY_QUEUE = "myQueue";
-
-      ActiveMQServer server = createServer(false);
-
-      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-      server.start();
-
-      ServerLocator locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
-
-      ClientSessionFactory factory = createSessionFactory(locator);
-      ClientSession session = factory.createSession(false, true, true);
-
-      session.createQueue(MY_ADDRESS, MY_QUEUE, true);
-
-      final ClientProducer producer = session.createProducer(MY_ADDRESS);
-
-      final ClientMessage message = session.createMessage(false);
-      message.getBodyBuffer().writeBytes(new byte[1024]);
-
-      ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
-      Callable<Object> sendMessageTask = new Callable<Object>() {
-         @Override
-         public Object call() throws ActiveMQException {
-            producer.send(message);
-            return null;
-         }
-      };
-
-      int sendCount = 0;
-
-      for (int i = 0; i < MAX_MESSAGES; i++) {
-         Future<Object> future = executor.submit(sendMessageTask);
-         try {
-            future.get(3, TimeUnit.SECONDS);
-            sendCount++;
-         }
-         catch (TimeoutException ex) {
-            // message sending has been blocked
-            break;
-         }
-         finally {
-            future.cancel(true); // may or may not desire this
-         }
-      }
-
-      executor.shutdown();
-      session.close();
-
-      session = factory.createSession(false, true, true);
-      session.start();
-      ClientConsumer consumer = session.createConsumer(MY_QUEUE);
-      for (int i = 0; i < sendCount; i++) {
-         ClientMessage msg = consumer.receive(250);
-         if (msg == null)
-            break;
-         msg.acknowledge();
-      }
-
-      session.close();
-      locator.close();
-      server.stop();
-
-      // Using the code only so the test doesn't fail just because someone edits the log text
-      Assert.assertTrue("Expected to find AMQ222183", AssertionLoggerHandler.findText("AMQ222183", "myAddress"));
-      Assert.assertTrue("Expected to find AMQ221046", AssertionLoggerHandler.findText("AMQ221046", "myAddress"));
-   }
-
-   @AfterClass
-   public static void clearLogger() {
-      AssertionLoggerHandler.stopCapture();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 1c4adc5..9e25c18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -40,6 +40,8 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
@@ -101,6 +103,46 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testSendOverDiskFull() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      try {
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+         sendFrame(frame);
+         frame = receiveFrame(10000);
+
+         Assert.assertTrue(frame.startsWith("CONNECTED"));
+         int count = 1000;
+         final CountDownLatch latch = new CountDownLatch(count);
+         consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message arg0) {
+               latch.countDown();
+            }
+         });
+
+         ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor().setMaxUsage(0).tick();
+
+         frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+         for (int i = 1; i <= count; i++) {
+            // Thread.sleep(1);
+            // System.out.println(">>> " + i);
+            sendFrame(frame);
+         }
+
+         // It should encounter the exception on logs
+         AssertionLoggerHandler.findText("AMQ119119");
+      }
+      finally {
+         AssertionLoggerHandler.clear();
+         AssertionLoggerHandler.stopCapture();
+      }
+
+   }
+
+   @Test
    public void testConnect() throws Exception {
 
       String connect_frame = "CONNECT\n" + "login: brianm\n" +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 7f73e48..2e93b83 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -205,7 +205,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
       TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
       TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName());
 
-      Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
       config.addAcceptorConfiguration(allTransport);
 
       ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6c42413..b70d432 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -448,5 +448,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
       public boolean isStarted() {
          return false;
       }
+
+      @Override
+      public boolean checkReleasedMemory() {
+         return true;
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 97520f3..9731277 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -826,6 +827,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       public void stop() throws InterruptedException {
       }
 
+      @Override
+      public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+
+      }
+
       public void beforePageRead() throws Exception {
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index 9faf307..bbaa3a3 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -23,39 +23,25 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 
 public final class FakePagingManager implements PagingManager {
 
-   public void activate() {
-   }
+   @Override
+   public void addBlockedStore(PagingStore store) {
 
-   public long addSize(final long size) {
-      return 0;
    }
 
    @Override
    public void addTransaction(final PageTransactionInfo pageTransaction) {
    }
 
-   public PagingStore createPageStore(final SimpleString destination) throws Exception {
-      return null;
-   }
-
-   public long getTotalMemory() {
-      return 0;
-   }
-
    @Override
    public SimpleString[] getStoreNames() {
       return null;
    }
 
-   public long getMaxMemory() {
-      return 0;
-   }
-
    @Override
    public PagingStore getPageStore(final SimpleString address) throws Exception {
       return null;
@@ -74,10 +60,6 @@ public final class FakePagingManager implements PagingManager {
       return false;
    }
 
-   public boolean isGlobalPageMode() {
-      return false;
-   }
-
    public boolean isPaging(final SimpleString destination) throws Exception {
       return false;
    }
@@ -93,21 +75,22 @@ public final class FakePagingManager implements PagingManager {
    }
 
    @Override
-   public void reloadStores() throws Exception {
+   public FakePagingManager addSize(int size) {
+      return this;
    }
 
    @Override
-   public void removeTransaction(final long transactionID) {
-
+   public void reloadStores() throws Exception {
    }
 
-   public void setGlobalPageMode(final boolean globalMode) {
-   }
+   @Override
+   public void removeTransaction(final long transactionID) {
 
-   public void setPostOffice(final PostOffice postOffice) {
    }
 
-   public void resumeDepages() {
+   @Override
+   public boolean isUsingGlobalSize() {
+      return false;
    }
 
    public void sync(final Collection<SimpleString> destinationsToSync) throws Exception {
@@ -126,10 +109,15 @@ public final class FakePagingManager implements PagingManager {
    public void stop() throws Exception {
    }
 
+   @Override
+   public boolean isDiskFull() {
+      return false;
+   }
+
    /*
-    * (non-Javadoc)
-    * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()
-    */
+       * (non-Javadoc)
+       * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()
+       */
    public boolean isGlobalFull() {
       return false;
    }
@@ -177,4 +165,8 @@ public final class FakePagingManager implements PagingManager {
       // no-op
    }
 
+   @Override
+   public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+
+   }
 }