You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/07/21 10:59:43 UTC

svn commit: r966145 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/store/ test/java/org/apache/activemq/store/ test/java/org/apache/activemq/store/jdbc/

Author: dejanb
Date: Wed Jul 21 08:59:42 2010
New Revision: 966145

URL: http://svn.apache.org/viewvc?rev=966145&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2843 - add mechanism for stores to know whether to use prioritized recovery or not

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed Jul 21 08:59:42 2010
@@ -609,6 +609,9 @@ public abstract class BaseDestination im
 
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         this.prioritizedMessages = prioritizedMessages;
+        if (store != null) {
+            store.setPrioritizedMessages(prioritizedMessages);
+        }
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Wed Jul 21 08:59:42 2010
@@ -30,6 +30,7 @@ import org.apache.activemq.usage.MemoryU
 abstract public class AbstractMessageStore implements MessageStore {
     public static final FutureTask<Object> FUTURE;
     protected final ActiveMQDestination destination;
+    protected boolean prioritizedMessages;
 
     public AbstractMessageStore(ActiveMQDestination destination) {
         this.destination = destination;
@@ -63,6 +64,14 @@ abstract public class AbstractMessageSto
     public boolean isEmpty() throws Exception {
         return getMessageCount() == 0;
     }
+    
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        this.prioritizedMessages = prioritizedMessages;
+    }    
+
+    public boolean isPrioritizedMessages() {
+        return this.prioritizedMessages;
+    }
 
     public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
         addMessage(context, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Wed Jul 21 08:59:42 2010
@@ -148,4 +148,16 @@ public interface MessageStore extends Se
      */
     boolean isEmpty() throws Exception;
     
+    /**
+     * A hint to the store to try recover messages according to priority
+     * @param prioritizedMessages
+     */
+    public void setPrioritizedMessages(boolean prioritizedMessages);
+    
+    /**
+     * 
+     * @return true if store is trying to recover messages according to priority
+     */
+    public boolean isPrioritizedMessages();
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Wed Jul 21 08:59:42 2010
@@ -113,4 +113,12 @@ public class ProxyMessageStore implement
     public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
         delegate.removeAsyncMessage(context, ack);       
     }
+
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        delegate.setPrioritizedMessages(prioritizedMessages);
+    }
+
+    public boolean isPrioritizedMessages() {
+        return delegate.isPrioritizedMessages();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Wed Jul 21 08:59:42 2010
@@ -153,4 +153,12 @@ public class ProxyTopicMessageStore impl
     public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
         delegate.removeAsyncMessage(context, ack);
     }
+
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        delegate.setPrioritizedMessages(prioritizedMessages);
+    }
+    
+    public boolean isPrioritizedMessages() {
+        return delegate.isPrioritizedMessages();
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=966145&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Wed Jul 21 08:59:42 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
+abstract public class MessagePriorityTest extends TestCase {
+
+    BrokerService broker;
+    PersistenceAdapter adapter;
+    
+    ActiveMQConnectionFactory factory;
+    Connection conn;
+    Session sess;
+    
+    abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
+    
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setBrokerName("priorityTest");
+        adapter = createPersistenceAdapter(true);
+        broker.setPersistenceAdapter(adapter);
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPrioritizedMessages(true);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+        broker.waitUntilStarted();
+        
+        factory = new ActiveMQConnectionFactory("vm://priorityTest");
+        conn = factory.createConnection();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+    
+    protected void tearDown() throws Exception {
+        sess.close();
+        conn.close();
+        
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+    
+    public void testStoreConfigured() throws Exception {
+        Queue queue = sess.createQueue("TEST");
+        Topic topic = sess.createTopic("TEST");
+        
+        MessageProducer queueProducer = sess.createProducer(queue);
+        MessageProducer topicProducer = sess.createProducer(topic);
+        
+        
+        Thread.sleep(100); // get it all propagated
+        
+        assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
+        assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
+        
+        queueProducer.close();
+        topicProducer.close();
+        
+    }
+    
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=966145&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Wed Jul 21 08:59:42 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.store.MessagePriorityTest;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCMessagePriorityTest extends MessagePriorityTest {
+
+    @Override
+    protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+        return jdbc;
+    }
+
+}