You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/08/26 17:50:09 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6067

Repository: activemq
Updated Branches:
  refs/heads/master 2b1cda196 -> b9fad53fc


https://issues.apache.org/jira/browse/AMQ-6067

The JDBCMessageStore now checks hasSpace() when running a message
recovery listener to prevent going past the max configured page size


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b9fad53f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b9fad53f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b9fad53f

Branch: refs/heads/master
Commit: b9fad53fc650a9026391bef0fd74aea1bea7ec1b
Parents: 2b1cda1
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Aug 26 13:46:42 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Aug 26 13:46:42 2016 -0400

----------------------------------------------------------------------
 .../activemq/store/jdbc/JDBCMessageStore.java   |  37 ++++-
 ...DBCPersistenceAdapterExpiredMessageTest.java | 157 +++++++++++++++++++
 2 files changed, 189 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b9fad53f/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 175002a..744220d 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class JDBCMessageStore extends AbstractMessageStore {
 
@@ -98,6 +98,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
+    @Override
     public void addMessage(final ConnectionContext context, final Message message) throws IOException {
         MessageId messageId = message.getMessageId();
         if (audit != null && audit.isDuplicate(message)) {
@@ -133,6 +134,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
                 pendingAdditions.add(sequence);
 
                 c.onCompletion(new Runnable() {
+                    @Override
                     public void run() {
                         // jdbc close or jms commit - while futureOrSequenceLong==null ordered
                         // work will remain pending on the Queue
@@ -207,6 +209,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
+    @Override
     public Message getMessage(MessageId messageId) throws IOException {
         // Get a connection and pull the message out of the DB
         TransactionContext c = persistenceAdapter.getTransactionContext();
@@ -245,6 +248,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
+    @Override
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
 
     	long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ?
@@ -263,6 +267,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
+    @Override
     public void recover(final MessageRecoveryListener listener) throws Exception {
 
         // Get all the Message ids out of the database.
@@ -270,14 +275,30 @@ public class JDBCMessageStore extends AbstractMessageStore {
         try {
             c = persistenceAdapter.getTransactionContext();
             adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
+                @Override
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
-                    Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
-                    msg.getMessageId().setBrokerSequenceId(sequenceId);
-                    return listener.recoverMessage(msg);
+                    if (listener.hasSpace()) {
+                        Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+                        msg.getMessageId().setBrokerSequenceId(sequenceId);
+                        return listener.recoverMessage(msg);
+                    } else {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Message recovery limit reached for MessageRecoveryListener");
+                        }
+                        return false;
+                    }
                 }
 
+                @Override
                 public boolean recoverMessageReference(String reference) throws Exception {
-                    return listener.recoverMessageReference(new MessageId(reference));
+                    if (listener.hasSpace()) {
+                        return listener.recoverMessageReference(new MessageId(reference));
+                    } else {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Message recovery limit reached for MessageRecoveryListener");
+                        }
+                        return false;
+                    }
                 }
             });
         } catch (SQLException e) {
@@ -291,6 +312,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
     /**
      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
      */
+    @Override
     public void removeAllMessages(ConnectionContext context) throws IOException {
         // Get a connection and remove the message from the DB
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
@@ -328,6 +350,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
      * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
      *      org.apache.activemq.store.MessageRecoveryListener)
      */
+    @Override
     public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -337,6 +360,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
             adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
                     maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
 
+                @Override
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
@@ -346,6 +370,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
                         return true;
                 }
 
+                @Override
                 public boolean recoverMessageReference(String reference) throws Exception {
                     if (listener.hasSpace()) {
                         listener.recoverMessageReference(new MessageId(reference));
@@ -370,6 +395,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
     /**
      * @see org.apache.activemq.store.MessageStore#resetBatching()
      */
+    @Override
     public void resetBatching() {
         if (LOG.isTraceEnabled()) {
             LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
@@ -401,6 +427,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
     }
 
 
+    @Override
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         super.setPrioritizedMessages(prioritizedMessages);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9fad53f/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
new file mode 100644
index 0000000..e8e819c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class JDBCPersistenceAdapterExpiredMessageTest {
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    protected BrokerService brokerService;
+    private AtomicBoolean hasSpaceCalled = new AtomicBoolean();
+    private int expireSize = 5;
+
+    @Before
+    public void setUp() throws Exception {
+        hasSpaceCalled.set(false);
+        brokerService = new BrokerService();
+
+        //Wrap the adapter and listener to set a flag to make sure we are calling hasSpace()
+        //during expiration
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter() {
+
+            @Override
+            public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+                ProxyTopicMessageStore proxy = new ProxyTopicMessageStore(super.createTopicMessageStore(destination)) {
+
+                    @Override
+                    public void recover(final MessageRecoveryListener listener) throws Exception {
+                        MessageRecoveryListener delegate = new MessageRecoveryListener() {
+
+                            @Override
+                            public boolean recoverMessageReference(MessageId ref) throws Exception {
+                                return listener.recoverMessageReference(ref);
+                            }
+
+                            @Override
+                            public boolean recoverMessage(Message message) throws Exception {
+                                return listener.recoverMessage(message);
+                            }
+
+                            @Override
+                            public boolean isDuplicate(MessageId ref) {
+                                return listener.isDuplicate(ref);
+                            }
+
+                            @Override
+                            public boolean hasSpace() {
+                                hasSpaceCalled.set(true);
+                                return listener.hasSpace();
+                            }
+                        };
+                        super.recover(delegate);
+                    }
+
+                };
+                return proxy;
+            }
+
+        };
+
+        brokerService.setSchedulerSupport(false);
+        brokerService.setDataDirectoryFile(dataFileDir.getRoot());
+        brokerService.setPersistenceAdapter(jdbc);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(5000);
+        defaultEntry.setMaxExpirePageSize(expireSize);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
+
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testMaxExpirePageSize() throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic("test.topic");
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.setClientID("clientId");;
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber sub = sess.createDurableSubscriber(topic, "sub1");
+        sub.close();
+
+        MessageProducer producer = sess.createProducer(topic);
+        producer.setTimeToLive(1000);
+
+        for (int i = 0; i < 50; i++) {
+            producer.send(sess.createTextMessage("test message: " + i));
+        }
+
+        //There should be exactly 5 messages expired because the limit was hit and it stopped
+        //The expire messages period is 5 seconds which should give enough time for this assertion
+        //to pass before expiring more messages
+        assertTrue(Wait.waitFor(new Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                long expired = brokerService.getDestination(topic).getDestinationStatistics().getExpired().getCount();
+                return expired == expireSize && hasSpaceCalled.get();
+            }
+        }, 15000, 1000));
+    }
+}