You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/06 14:08:04 UTC
[2/7] activemq-artemis git commit: ARTEMIS-581 Implement max disk
usage, and global-max-size
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java
deleted file mode 100644
index f315b89..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/AddressFullLoggingTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.server;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class AddressFullLoggingTest extends ActiveMQTestBase {
-
- @BeforeClass
- public static void prepareLogger() {
- AssertionLoggerHandler.startCapture();
- }
-
- @Test
- /**
- * When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
- *
- * -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
- */ public void testBlockLogging() throws Exception {
- final int MAX_MESSAGES = 200;
- final String MY_ADDRESS = "myAddress";
- final String MY_QUEUE = "myQueue";
-
- ActiveMQServer server = createServer(false);
-
- AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
- server.start();
-
- ServerLocator locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
-
- ClientSessionFactory factory = createSessionFactory(locator);
- ClientSession session = factory.createSession(false, true, true);
-
- session.createQueue(MY_ADDRESS, MY_QUEUE, true);
-
- final ClientProducer producer = session.createProducer(MY_ADDRESS);
-
- final ClientMessage message = session.createMessage(false);
- message.getBodyBuffer().writeBytes(new byte[1024]);
-
- ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
- Callable<Object> sendMessageTask = new Callable<Object>() {
- @Override
- public Object call() throws ActiveMQException {
- producer.send(message);
- return null;
- }
- };
-
- int sendCount = 0;
-
- for (int i = 0; i < MAX_MESSAGES; i++) {
- Future<Object> future = executor.submit(sendMessageTask);
- try {
- future.get(3, TimeUnit.SECONDS);
- sendCount++;
- }
- catch (TimeoutException ex) {
- // message sending has been blocked
- break;
- }
- finally {
- future.cancel(true); // may or may not desire this
- }
- }
-
- executor.shutdown();
- session.close();
-
- session = factory.createSession(false, true, true);
- session.start();
- ClientConsumer consumer = session.createConsumer(MY_QUEUE);
- for (int i = 0; i < sendCount; i++) {
- ClientMessage msg = consumer.receive(250);
- if (msg == null)
- break;
- msg.acknowledge();
- }
-
- session.close();
- locator.close();
- server.stop();
-
- // Using the code only so the test doesn't fail just because someone edits the log text
- Assert.assertTrue("Expected to find AMQ222183", AssertionLoggerHandler.findText("AMQ222183", "myAddress"));
- Assert.assertTrue("Expected to find AMQ221046", AssertionLoggerHandler.findText("AMQ221046", "myAddress"));
- }
-
- @AfterClass
- public static void clearLogger() {
- AssertionLoggerHandler.stopCapture();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 1c4adc5..9e25c18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -40,6 +40,8 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
@@ -101,6 +103,46 @@ public class StompTest extends StompTestBase {
}
@Test
+ public void testSendOverDiskFull() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ try {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+ int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message arg0) {
+ latch.countDown();
+ }
+ });
+
+ ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor().setMaxUsage(0).tick();
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ for (int i = 1; i <= count; i++) {
+ // Thread.sleep(1);
+ // System.out.println(">>> " + i);
+ sendFrame(frame);
+ }
+
+ // It should encounter the exception on logs
+ AssertionLoggerHandler.findText("AMQ119119");
+ }
+ finally {
+ AssertionLoggerHandler.clear();
+ AssertionLoggerHandler.stopCapture();
+ }
+
+ }
+
+ @Test
public void testConnect() throws Exception {
String connect_frame = "CONNECT\n" + "login: brianm\n" +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 7f73e48..2e93b83 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -205,7 +205,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName());
- Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
config.addAcceptorConfiguration(allTransport);
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6c42413..b70d432 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -448,5 +448,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
public boolean isStarted() {
return false;
}
+
+ @Override
+ public boolean checkReleasedMemory() {
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 97520f3..9731277 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -826,6 +827,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
public void stop() throws InterruptedException {
}
+ @Override
+ public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+
+ }
+
public void beforePageRead() throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
index 9faf307..bbaa3a3 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java
@@ -23,39 +23,25 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
public final class FakePagingManager implements PagingManager {
- public void activate() {
- }
+ @Override
+ public void addBlockedStore(PagingStore store) {
- public long addSize(final long size) {
- return 0;
}
@Override
public void addTransaction(final PageTransactionInfo pageTransaction) {
}
- public PagingStore createPageStore(final SimpleString destination) throws Exception {
- return null;
- }
-
- public long getTotalMemory() {
- return 0;
- }
-
@Override
public SimpleString[] getStoreNames() {
return null;
}
- public long getMaxMemory() {
- return 0;
- }
-
@Override
public PagingStore getPageStore(final SimpleString address) throws Exception {
return null;
@@ -74,10 +60,6 @@ public final class FakePagingManager implements PagingManager {
return false;
}
- public boolean isGlobalPageMode() {
- return false;
- }
-
public boolean isPaging(final SimpleString destination) throws Exception {
return false;
}
@@ -93,21 +75,22 @@ public final class FakePagingManager implements PagingManager {
}
@Override
- public void reloadStores() throws Exception {
+ public FakePagingManager addSize(int size) {
+ return this;
}
@Override
- public void removeTransaction(final long transactionID) {
-
+ public void reloadStores() throws Exception {
}
- public void setGlobalPageMode(final boolean globalMode) {
- }
+ @Override
+ public void removeTransaction(final long transactionID) {
- public void setPostOffice(final PostOffice postOffice) {
}
- public void resumeDepages() {
+ @Override
+ public boolean isUsingGlobalSize() {
+ return false;
}
public void sync(final Collection<SimpleString> destinationsToSync) throws Exception {
@@ -126,10 +109,15 @@ public final class FakePagingManager implements PagingManager {
public void stop() throws Exception {
}
+ @Override
+ public boolean isDiskFull() {
+ return false;
+ }
+
/*
- * (non-Javadoc)
- * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()
- */
+ * (non-Javadoc)
+ * @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()
+ */
public boolean isGlobalFull() {
return false;
}
@@ -177,4 +165,8 @@ public final class FakePagingManager implements PagingManager {
// no-op
}
+ @Override
+ public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+
+ }
}