You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/08/26 17:50:09 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6067
Repository: activemq
Updated Branches:
refs/heads/master 2b1cda196 -> b9fad53fc
https://issues.apache.org/jira/browse/AMQ-6067
The JDBCMessageStore now checks hasSpace() when running a message
recovery listener to prevent going past the max configured page size
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b9fad53f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b9fad53f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b9fad53f
Branch: refs/heads/master
Commit: b9fad53fc650a9026391bef0fd74aea1bea7ec1b
Parents: 2b1cda1
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Aug 26 13:46:42 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Aug 26 13:46:42 2016 -0400
----------------------------------------------------------------------
.../activemq/store/jdbc/JDBCMessageStore.java | 37 ++++-
...DBCPersistenceAdapterExpiredMessageTest.java | 157 +++++++++++++++++++
2 files changed, 189 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b9fad53f/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 175002a..744220d 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class JDBCMessageStore extends AbstractMessageStore {
@@ -98,6 +98,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
+ @Override
public void addMessage(final ConnectionContext context, final Message message) throws IOException {
MessageId messageId = message.getMessageId();
if (audit != null && audit.isDuplicate(message)) {
@@ -133,6 +134,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
pendingAdditions.add(sequence);
c.onCompletion(new Runnable() {
+ @Override
public void run() {
// jdbc close or jms commit - while futureOrSequenceLong==null ordered
// work will remain pending on the Queue
@@ -207,6 +209,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
+ @Override
public Message getMessage(MessageId messageId) throws IOException {
// Get a connection and pull the message out of the DB
TransactionContext c = persistenceAdapter.getTransactionContext();
@@ -245,6 +248,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
+ @Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ?
@@ -263,6 +267,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
+ @Override
public void recover(final MessageRecoveryListener listener) throws Exception {
// Get all the Message ids out of the database.
@@ -270,14 +275,30 @@ public class JDBCMessageStore extends AbstractMessageStore {
try {
c = persistenceAdapter.getTransactionContext();
adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
+ @Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- return listener.recoverMessage(msg);
+ if (listener.hasSpace()) {
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ return listener.recoverMessage(msg);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message recovery limit reached for MessageRecoveryListener");
+ }
+ return false;
+ }
}
+ @Override
public boolean recoverMessageReference(String reference) throws Exception {
- return listener.recoverMessageReference(new MessageId(reference));
+ if (listener.hasSpace()) {
+ return listener.recoverMessageReference(new MessageId(reference));
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Message recovery limit reached for MessageRecoveryListener");
+ }
+ return false;
+ }
}
});
} catch (SQLException e) {
@@ -291,6 +312,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
+ @Override
public void removeAllMessages(ConnectionContext context) throws IOException {
// Get a connection and remove the message from the DB
TransactionContext c = persistenceAdapter.getTransactionContext(context);
@@ -328,6 +350,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
* org.apache.activemq.store.MessageRecoveryListener)
*/
+ @Override
public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@@ -337,6 +360,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
+ @Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
@@ -346,6 +370,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
return true;
}
+ @Override
public boolean recoverMessageReference(String reference) throws Exception {
if (listener.hasSpace()) {
listener.recoverMessageReference(new MessageId(reference));
@@ -370,6 +395,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
/**
* @see org.apache.activemq.store.MessageStore#resetBatching()
*/
+ @Override
public void resetBatching() {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
@@ -401,6 +427,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
+ @Override
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/b9fad53f/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
new file mode 100644
index 0000000..e8e819c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class JDBCPersistenceAdapterExpiredMessageTest {
+
+ @Rule
+ public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+ protected BrokerService brokerService;
+ private AtomicBoolean hasSpaceCalled = new AtomicBoolean();
+ private int expireSize = 5;
+
+ @Before
+ public void setUp() throws Exception {
+ hasSpaceCalled.set(false);
+ brokerService = new BrokerService();
+
+ //Wrap the adapter and listener to set a flag to make sure we are calling hasSpace()
+ //during expiration
+ JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter() {
+
+ @Override
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+ ProxyTopicMessageStore proxy = new ProxyTopicMessageStore(super.createTopicMessageStore(destination)) {
+
+ @Override
+ public void recover(final MessageRecoveryListener listener) throws Exception {
+ MessageRecoveryListener delegate = new MessageRecoveryListener() {
+
+ @Override
+ public boolean recoverMessageReference(MessageId ref) throws Exception {
+ return listener.recoverMessageReference(ref);
+ }
+
+ @Override
+ public boolean recoverMessage(Message message) throws Exception {
+ return listener.recoverMessage(message);
+ }
+
+ @Override
+ public boolean isDuplicate(MessageId ref) {
+ return listener.isDuplicate(ref);
+ }
+
+ @Override
+ public boolean hasSpace() {
+ hasSpaceCalled.set(true);
+ return listener.hasSpace();
+ }
+ };
+ super.recover(delegate);
+ }
+
+ };
+ return proxy;
+ }
+
+ };
+
+ brokerService.setSchedulerSupport(false);
+ brokerService.setDataDirectoryFile(dataFileDir.getRoot());
+ brokerService.setPersistenceAdapter(jdbc);
+ brokerService.setDeleteAllMessagesOnStartup(true);
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setExpireMessagesPeriod(5000);
+ defaultEntry.setMaxExpirePageSize(expireSize);
+ policyMap.setDefaultEntry(defaultEntry);
+ brokerService.setDestinationPolicy(policyMap);
+
+ brokerService.start();
+ }
+
+ @After
+ public void stop() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+ }
+
+ @Test
+ public void testMaxExpirePageSize() throws Exception {
+ final ActiveMQTopic topic = new ActiveMQTopic("test.topic");
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ factory.setClientID("clientId");;
+ Connection conn = factory.createConnection();
+ conn.start();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber sub = sess.createDurableSubscriber(topic, "sub1");
+ sub.close();
+
+ MessageProducer producer = sess.createProducer(topic);
+ producer.setTimeToLive(1000);
+
+ for (int i = 0; i < 50; i++) {
+ producer.send(sess.createTextMessage("test message: " + i));
+ }
+
+ //There should be exactly 5 messages expired because the limit was hit and it stopped
+ //The expire messages period is 5 seconds which should give enough time for this assertion
+ //to pass before expiring more messages
+ assertTrue(Wait.waitFor(new Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ long expired = brokerService.getDestination(topic).getDestinationStatistics().getExpired().getCount();
+ return expired == expireSize && hasSpaceCalled.get();
+ }
+ }, 15000, 1000));
+ }
+}