You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/07/21 10:59:43 UTC
svn commit: r966145 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/store/ test/java/org/apache/activemq/store/
test/java/org/apache/activemq/store/jdbc/
Author: dejanb
Date: Wed Jul 21 08:59:42 2010
New Revision: 966145
URL: http://svn.apache.org/viewvc?rev=966145&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2843 - add mechanism for stores to know whether to use prioritized recovery or not
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed Jul 21 08:59:42 2010
@@ -609,6 +609,9 @@ public abstract class BaseDestination im
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
+ if (store != null) {
+ store.setPrioritizedMessages(prioritizedMessages);
+ }
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Wed Jul 21 08:59:42 2010
@@ -30,6 +30,7 @@ import org.apache.activemq.usage.MemoryU
abstract public class AbstractMessageStore implements MessageStore {
public static final FutureTask<Object> FUTURE;
protected final ActiveMQDestination destination;
+ protected boolean prioritizedMessages;
public AbstractMessageStore(ActiveMQDestination destination) {
this.destination = destination;
@@ -63,6 +64,14 @@ abstract public class AbstractMessageSto
public boolean isEmpty() throws Exception {
return getMessageCount() == 0;
}
+
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ this.prioritizedMessages = prioritizedMessages;
+ }
+
+ public boolean isPrioritizedMessages() {
+ return this.prioritizedMessages;
+ }
public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Wed Jul 21 08:59:42 2010
@@ -148,4 +148,16 @@ public interface MessageStore extends Se
*/
boolean isEmpty() throws Exception;
+ /**
+ * A hint to the store to try recover messages according to priority
+ * @param prioritizedMessages
+ */
+ public void setPrioritizedMessages(boolean prioritizedMessages);
+
+ /**
+ *
+ * @return true if store is trying to recover messages according to priority
+ */
+ public boolean isPrioritizedMessages();
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Wed Jul 21 08:59:42 2010
@@ -113,4 +113,12 @@ public class ProxyMessageStore implement
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
}
+
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ delegate.setPrioritizedMessages(prioritizedMessages);
+ }
+
+ public boolean isPrioritizedMessages() {
+ return delegate.isPrioritizedMessages();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=966145&r1=966144&r2=966145&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Wed Jul 21 08:59:42 2010
@@ -153,4 +153,12 @@ public class ProxyTopicMessageStore impl
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
delegate.removeAsyncMessage(context, ack);
}
+
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ delegate.setPrioritizedMessages(prioritizedMessages);
+ }
+
+ public boolean isPrioritizedMessages() {
+ return delegate.isPrioritizedMessages();
+ }
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=966145&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Wed Jul 21 08:59:42 2010
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
+abstract public class MessagePriorityTest extends TestCase {
+
+ BrokerService broker;
+ PersistenceAdapter adapter;
+
+ ActiveMQConnectionFactory factory;
+ Connection conn;
+ Session sess;
+
+ abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
+
+ protected void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setBrokerName("priorityTest");
+ adapter = createPersistenceAdapter(true);
+ broker.setPersistenceAdapter(adapter);
+ PolicyEntry policy = new PolicyEntry();
+ policy.setPrioritizedMessages(true);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(policyMap);
+ broker.start();
+ broker.waitUntilStarted();
+
+ factory = new ActiveMQConnectionFactory("vm://priorityTest");
+ conn = factory.createConnection();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ protected void tearDown() throws Exception {
+ sess.close();
+ conn.close();
+
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ public void testStoreConfigured() throws Exception {
+ Queue queue = sess.createQueue("TEST");
+ Topic topic = sess.createTopic("TEST");
+
+ MessageProducer queueProducer = sess.createProducer(queue);
+ MessageProducer topicProducer = sess.createProducer(topic);
+
+
+ Thread.sleep(100); // get it all propagated
+
+ assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages());
+ assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages());
+
+ queueProducer.close();
+ topicProducer.close();
+
+ }
+
+}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=966145&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java Wed Jul 21 08:59:42 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.jdbc;
+
+import org.apache.activemq.store.MessagePriorityTest;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCMessagePriorityTest extends MessagePriorityTest {
+
+ @Override
+ protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
+ JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+ EmbeddedDataSource dataSource = new EmbeddedDataSource();
+ dataSource.setDatabaseName("derbyDb");
+ dataSource.setCreateDatabase("create");
+ jdbc.setDataSource(dataSource);
+ return jdbc;
+ }
+
+}