You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/04/18 17:06:35 UTC
[5/6] activemq-artemis git commit: ARTEMIS-1121 Improving expiry
scanner
ARTEMIS-1121 Improving expiry scanner
https://issues.apache.org/jira/browse/ARTEMIS-1121
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1a397724
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1a397724
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1a397724
Branch: refs/heads/master
Commit: 1a397724898259d5451cd441f5f6b301d2c8571b
Parents: 31d78ed
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 18 10:29:29 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 18 11:49:25 2017 -0400
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPMessage.java | 3 +-
.../core/paging/impl/PagingStoreImpl.java | 2 +-
.../core/server/ActiveMQServerLogger.java | 6 +-
.../artemis/core/server/impl/QueueImpl.java | 46 ++++-
tests/smoke-tests/pom.xml | 17 ++
.../main/resources/servers/expire/broker.xml | 184 +++++++++++++++++++
.../tests/smoke/expire/TestSimpleExpire.java | 92 ++++++++++
7 files changed, 338 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d627fd5..d447ecd 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -633,7 +633,8 @@ public class AMQPMessage extends RefCountMessage {
private synchronized void checkBuffer() {
if (!bufferValid) {
- ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
+ int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
try {
getProtonMessage().encode(new NettyWritable(buffer));
byte[] bytes = new byte[buffer.writerIndex()];
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 03f53c6..6486ec9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -683,7 +683,7 @@ public class PagingStoreImpl implements PagingStore {
if (pagingManager.isDiskFull()) {
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
} else {
- ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
+ ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
}
blocking.set(true);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 9aaba7c..48507a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -944,7 +944,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
void errorExpiringReferencesNoBindings(SimpleString expiryAddress);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222147, value = "Message has expired. No expiry queue configured for queue {0} so dropping it", format = Message.Format.MESSAGE_FORMAT)
+ @Message(id = 222147, value = "Messages are being expired on queue{0}. However there is no expiry queue configured, hence messages will be dropped.", format = Message.Format.MESSAGE_FORMAT)
void errorExpiringReferencesNoQueue(SimpleString name);
@LogMessage(level = Logger.Level.WARN)
@@ -1104,8 +1104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void missingClusterConfigForScaleDown(String scaleDownCluster);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}", format = Message.Format.MESSAGE_FORMAT)
- void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize);
+ @Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes on address: {2}, global-max-size is {3}", format = Message.Format.MESSAGE_FORMAT)
+ void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize, long globalMaxSize);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222184,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index dc4b090..a62ae79 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -146,6 +146,8 @@ public class QueueImpl implements Queue {
private final LinkedListIterator<PagedReference> pageIterator;
+ private volatile boolean printErrorExpiring = false;
+
// Messages will first enter intermediateMessageReferences
// Before they are added to messageReferences
// This is to avoid locking the queue on the producer
@@ -1567,27 +1569,52 @@ public class QueueImpl implements Queue {
if (queueDestroyed) {
return;
}
+ logger.debug("Scanning for expires on " + QueueImpl.this.getName());
LinkedListIterator<MessageReference> iter = iterator();
+ boolean expired = false;
+ boolean hasElements = false;
+
+ int elementsExpired = 0;
try {
- boolean expired = false;
- boolean hasElements = false;
+ Transaction tx = null;
+
while (postOffice.isStarted() && iter.hasNext()) {
hasElements = true;
MessageReference ref = iter.next();
try {
if (ref.getMessage().isExpired()) {
+ if (tx == null) {
+ tx = new TransactionImpl(storageManager);
+ }
incDelivering();
expired = true;
- expire(ref);
+ expire(tx, ref);
iter.remove();
refRemoved(ref);
+
+ if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
+ logger.debug("Breaking loop of expiring");
+ scannerRunning.incrementAndGet();
+ getExecutor().execute(this);
+ break;
+ }
}
+
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
}
+ }
+ logger.debug("Expired " + elementsExpired + " references");
+
+ try {
+ if (tx != null) {
+ tx.commit();
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well
@@ -1600,6 +1627,8 @@ public class QueueImpl implements Queue {
} catch (Throwable ignored) {
}
scannerRunning.decrementAndGet();
+ logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
+
}
}
}
@@ -1912,7 +1941,6 @@ public class QueueImpl implements Queue {
return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
}
-
private synchronized void internalAddTail(final MessageReference ref) {
refAdded(ref);
messageReferences.addTail(ref, getPriority(ref));
@@ -2519,7 +2547,11 @@ public class QueueImpl implements Queue {
move(expiryAddress, tx, ref, true, true);
}
} else {
- ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
+ if (!printErrorExpiring) {
+ printErrorExpiring = true;
+ // print this only once
+ ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
+ }
acknowledge(tx, ref);
}
@@ -3015,7 +3047,7 @@ public class QueueImpl implements Queue {
if (messagesIterator != null && messagesIterator.hasNext()) {
MessageReference msg = messagesIterator.next();
if (msg.isPaged()) {
- previouslyBrowsed.add(((PagedReference)msg).getPosition());
+ previouslyBrowsed.add(((PagedReference) msg).getPosition());
}
return msg;
} else {
@@ -3156,7 +3188,7 @@ public class QueueImpl implements Queue {
if (consumersSet.size() == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
- } else if (queueRate < (threshold * consumersSet.size())) {
+ } else if (queueRate < (threshold * consumersSet.size())) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 4b06386..4734706 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -172,6 +172,23 @@
<configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
</configuration>
</execution>
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-expire</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <!-- this makes it easier in certain envs -->
+ <configuration>${basedir}/target/classes/servers/expire</configuration>
+ <javaOptions>-Dartemis.debug.paging.interval=1</javaOptions>
+ <allowAnonymous>true</allowAnonymous>
+ <user>admin</user>
+ <password>admin</password>
+ <instance>${basedir}/target/expire</instance>
+ </configuration>
+ </execution>
+
</executions>
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
new file mode 100644
index 0000000..a4176f8
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
@@ -0,0 +1,184 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+ <large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <!--
+ You can specify the NIC you want to use to verify if the network
+ <network-check-NIC>theNickName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+ <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- the system will enter into page mode once you hit this limit.
+ This is an estimate in bytes of how much the messages are using in memory
+
+ The system will use half of the available memory (-Xmx) by default for the global-max-size.
+ You may specify a different value here if you need to customize it to your needs.
+
+ <global-max-size>100Mb</global-max-size>
+
+ -->
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+
+ <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
+ <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
+
+ <!-- STOMP Acceptor. -->
+ <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+ <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
+ <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+ <!-- MQTT Acceptor -->
+ <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+ </acceptors>
+
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-create-jms-queues>true</auto-create-jms-queues>
+ <auto-create-jms-topics>true</auto-create-jms-topics>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-create-jms-queues>true</auto-create-jms-queues>
+ <auto-create-jms-topics>true</auto-create-jms-topics>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+
+ </addresses>
+
+ </core>
+</configuration>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
new file mode 100644
index 0000000..40bd6fc
--- /dev/null
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.expire;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSimpleExpire extends SmokeTestBase {
+
+ public static final String SERVER_NAME_0 = "expire";
+
+ @Before
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ disableCheckThread();
+ startServer(SERVER_NAME_0, 0, 30000);
+ }
+
+ @Test
+ public void testSendExpire() throws Exception {
+ ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ Queue queue = session.createQueue("q0");
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ producer.setTimeToLive(1000);
+ for (int i = 0; i < 20000; i++) {
+ producer.send(session.createTextMessage("expired"));
+ if (i % 5000 == 0) {
+ session.commit();
+ System.out.println("Sent " + i + " + messages");
+ }
+
+ }
+
+ session.commit();
+
+ Thread.sleep(5000);
+ producer.setTimeToLive(0);
+ for (int i = 0; i < 500; i++) {
+ producer.send(session.createTextMessage("ok"));
+
+ }
+ session.commit();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+
+ for (int i = 0; i < 500; i++) {
+ TextMessage txt = (TextMessage) consumer.receive(10000);
+ Assert.assertNotNull(txt);
+ Assert.assertEquals("ok", txt.getText());
+ }
+
+ session.commit();
+
+
+
+ }
+
+}