You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/17 11:34:00 UTC
svn commit: r476101 [1/2] - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/region/cursors/ store/ store/jdbc/ store/jdbc/adapter/
store/journal/ store/kahadaptor/ store/memory/ store/rapid/
Author: rajdavies
Date: Fri Nov 17 02:33:57 2006
New Revision: 476101
URL: http://svn.apache.org/viewvc?view=rev&rev=476101
Log:
Adding persitent cursor support for Queues
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=auto&rev=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Fri Nov 17 02:33:57 2006
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import javax.jms.JMSException;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * perist pending messages pending message (messages awaiting disptach to a
+ * consumer) cursor
+ *
+ * @version $Revision: 474985 $
+ */
+class QueueStorePrefetch extends AbstractPendingMessageCursor implements
+ MessageRecoveryListener {
+
+ static private final Log log=LogFactory.getLog(QueueStorePrefetch.class);
+
+ private MessageStore store;
+ private final LinkedList batchList=new LinkedList();
+ private Destination regionDestination;
+
+ /**
+ * @param topic
+ * @param clientId
+ * @param subscriberName
+ * @throws IOException
+ */
+ public QueueStorePrefetch(Queue queue){
+ this.regionDestination = queue;
+ this.store=(MessageStore)queue.getMessageStore();
+
+ }
+
+ public void start() throws Exception{
+ }
+
+ public void stop() throws Exception{
+ store.resetBatching();
+ }
+
+ /**
+ * @return true if there are no pending messages
+ */
+ public boolean isEmpty(){
+ return batchList.isEmpty();
+ }
+
+ public synchronized int size(){
+ try {
+ return store.getMessageCount();
+ }catch(IOException e) {
+ log.error("Failed to get message count",e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public synchronized void addMessageLast(MessageReference node) throws Exception{
+ if(node!=null){
+ node.decrementReferenceCount();
+ }
+ }
+
+ public synchronized boolean hasNext(){
+ if(isEmpty()){
+ try{
+ fillBatch();
+ }catch(Exception e){
+ log.error("Failed to fill batch",e);
+ throw new RuntimeException(e);
+ }
+ }
+ return !isEmpty();
+ }
+
+ public synchronized MessageReference next(){
+ Message result = (Message)batchList.removeFirst();
+ result.setRegionDestination(regionDestination);
+ return result;
+ }
+
+ public void reset(){
+ }
+
+ // MessageRecoveryListener implementation
+ public void finished(){
+ }
+
+ public void recoverMessage(Message message) throws Exception{
+ message.setRegionDestination(regionDestination);
+ message.incrementReferenceCount();
+ batchList.addLast(message);
+ }
+
+ public void recoverMessageReference(String messageReference)
+ throws Exception{
+ // shouldn't get called
+ throw new RuntimeException("Not supported");
+ }
+
+ // implementation
+ protected void fillBatch() throws Exception{
+ store.recoverNextMessages(maxBatchSize,this);
+ // this will add more messages to the batch list
+ if(!batchList.isEmpty()){
+ Message message=(Message)batchList.getLast();
+ }
+ }
+
+ public String toString() {
+ return "QueueStorePrefetch" + System.identityHashCode(this) ;
+ }
+
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Nov 17 02:33:57 2006
@@ -27,7 +27,6 @@
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
@@ -142,12 +141,6 @@
TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
if(tsp!=null){
tsp.addMessageLast(node);
- if(started){
- // if the store has been empty - then this message is next to dispatch
- if((pendingCount-nonPersistent.size())<=0){
- tsp.nextToDispatch(node.getMessageId());
- }
- }
}
}
}
@@ -190,6 +183,7 @@
}
public synchronized void reset(){
+ nonPersistent.reset();
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.reset();
@@ -199,21 +193,27 @@
public int size(){
return pendingCount;
}
+
+ public synchronized void setMaxBatchSize(int maxBatchSize){
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+ tsp.setMaxBatchSize(maxBatchSize);
+ }
+ super.setMaxBatchSize(maxBatchSize);
+ }
protected synchronized PendingMessageCursor getNextCursor() throws Exception{
if(currentCursor==null||currentCursor.isEmpty()){
currentCursor=null;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
- AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
- tsp.setMaxBatchSize(getMaxBatchSize());
+ AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
if(tsp.hasNext()){
currentCursor=tsp;
break;
}
}
// round-robin
- Object obj=storePrefetches.removeFirst();
- storePrefetches.addLast(obj);
+ storePrefetches.addLast(storePrefetches.removeFirst());
}
return currentCursor;
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=auto&rev=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Nov 17 02:33:57 2006
@@ -0,0 +1,170 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.Iterator;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.Store;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Store based Cursor for Queues
+ *
+ * @version $Revision: 474985 $
+ */
+public class StoreQueueCursor extends AbstractPendingMessageCursor{
+
+ static private final Log log=LogFactory.getLog(StoreQueueCursor.class);
+ private int pendingCount=0;
+ private Queue queue;
+ private Store tmpStore;
+ private PendingMessageCursor nonPersistent;
+ private QueueStorePrefetch persistent;
+ private boolean started;
+ private PendingMessageCursor currentCursor;
+
+ /**
+ * Construct
+ *
+ * @param queue
+ * @param tmpStore
+ */
+ public StoreQueueCursor(Queue queue,Store tmpStore){
+ this.queue=queue;
+ this.tmpStore=tmpStore;
+ this.persistent=new QueueStorePrefetch(queue);
+ }
+
+ public synchronized void start() throws Exception{
+ started=true;
+ if(nonPersistent==null){
+ nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore);
+ nonPersistent.setMaxBatchSize(getMaxBatchSize());
+ }
+ nonPersistent.start();
+ pendingCount=persistent.size();
+ }
+
+ public synchronized void stop() throws Exception{
+ started=false;
+ if(nonPersistent!=null){
+ nonPersistent.stop();
+ }
+ pendingCount=0;
+ }
+
+ public synchronized void addMessageLast(MessageReference node) throws Exception{
+ if(node!=null){
+ Message msg=node.getMessage();
+ if(started){
+ pendingCount++;
+ if(!msg.isPersistent()){
+ nonPersistent.addMessageLast(node);
+ }
+ }
+ if(msg.isPersistent()){
+ persistent.addMessageLast(node);
+ }
+ }
+ }
+
+ public void clear(){
+ pendingCount=0;
+ }
+
+ public synchronized boolean hasNext(){
+ boolean result=pendingCount>0;
+ if(result){
+ try{
+ currentCursor=getNextCursor();
+ }catch(Exception e){
+ log.error("Failed to get current cursor ",e);
+ throw new RuntimeException(e);
+ }
+ result=currentCursor!=null?currentCursor.hasNext():false;
+ }
+ return result;
+ }
+
+ public synchronized MessageReference next(){
+ return currentCursor!=null?currentCursor.next():null;
+ }
+
+ public synchronized void remove(){
+ if(currentCursor!=null){
+ currentCursor.remove();
+ }
+ pendingCount--;
+ }
+
+ public void remove(MessageReference node){
+ pendingCount--;
+ }
+
+ public synchronized void reset(){
+ nonPersistent.reset();
+ }
+
+ public int size(){
+ return pendingCount;
+ }
+
+ public synchronized boolean isEmpty(){
+ return pendingCount<=0;
+ }
+
+ /**
+ * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
+ * may do
+ *
+ * @see org.apache.activemq.region.cursors.PendingMessageCursor
+ * @return true if recovery required
+ */
+ public boolean isRecoveryRequired(){
+ return false;
+ }
+
+ /**
+ * @return the nonPersistent Cursor
+ */
+ public PendingMessageCursor getNonPersistent(){
+ return this.nonPersistent;
+ }
+
+ /**
+ * @param nonPersistent cursor to set
+ */
+ public void setNonPersistent(PendingMessageCursor nonPersistent){
+ this.nonPersistent=nonPersistent;
+ }
+
+ public void setMaxBatchSize(int maxBatchSize){
+ persistent.setMaxBatchSize(maxBatchSize);
+ if(nonPersistent!=null){
+ nonPersistent.setMaxBatchSize(maxBatchSize);
+ }
+ super.setMaxBatchSize(maxBatchSize);
+ }
+
+ protected synchronized PendingMessageCursor getNextCursor() throws Exception{
+ if(currentCursor==null||currentCursor.isEmpty()){
+ currentCursor = currentCursor == persistent ? nonPersistent : persistent;
+ }
+ return currentCursor;
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Fri Nov 17 02:33:57 2006
@@ -46,7 +46,6 @@
private final LinkedList batchList=new LinkedList();
private String clientId;
private String subscriberName;
- private MessageId lastMessageId;
private Destination regionDestination;
/**
@@ -66,7 +65,7 @@
}
public void stop() throws Exception{
- store.resetBatching(clientId,clientId,null);
+ store.resetBatching(clientId,subscriberName);
}
/**
@@ -130,12 +129,12 @@
// implementation
protected void fillBatch() throws Exception{
- store.recoverNextMessages(clientId,subscriberName,lastMessageId,
+ store.recoverNextMessages(clientId,subscriberName,
maxBatchSize,this);
// this will add more messages to the batch list
if(!batchList.isEmpty()){
Message message=(Message)batchList.getLast();
- lastMessageId=message.getMessageId();
+
}
}
@@ -143,8 +142,4 @@
return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
}
- synchronized void nextToDispatch(MessageId id) throws Exception {
- lastMessageId = store.getPreviousMessageIdToDeliver(clientId,clientId,id);
- store.resetBatching(clientId,clientId,id);
- }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Fri Nov 17 02:33:57 2006
@@ -1,24 +1,20 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.store;
import java.io.IOException;
-
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -28,70 +24,84 @@
import org.apache.activemq.memory.UsageManager;
/**
- * Represents a message store which is used by the persistent {@link org.apache.activemq.service.MessageContainer}
+ * Represents a message store which is used by the persistent
* implementations
- *
+ *
* @version $Revision: 1.5 $
*/
-public interface MessageStore extends Service {
-
+public interface MessageStore extends Service{
+
/**
* Adds a message to the message store
- * @param context TODO
+ *
+ * @param context context
+ * @param message
+ * @throws IOException
*/
- public void addMessage(ConnectionContext context, Message message) throws IOException;
+ public void addMessage(ConnectionContext context,Message message) throws IOException;
/**
* Adds a message reference to the message store
- * @param context TODO
- * @param messageId TODO
- * @param expirationTime TODO
- */
- public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException;
+ *
+ * @param context
+ * @param messageId
+ * @param expirationTime
+ * @param messageRef
+ * @throws IOException
+ */
+ public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+ throws IOException;
/**
- * Looks up a message using either the String messageID or
- * the messageNumber. Implementations are encouraged to fill in the missing
- * key if its easy to do so.
+ * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
+ * in the missing key if its easy to do so.
+ *
* @param identity which contains either the messageID or the messageNumber
* @return the message or null if it does not exist
+ * @throws IOException
*/
public Message getMessage(MessageId identity) throws IOException;
/**
- * Looks up a message using either the String messageID or
- * the messageNumber. Implementations are encouraged to fill in the missing
- * key if its easy to do so.
+ * Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill
+ * in the missing key if its easy to do so.
+ *
* @param identity which contains either the messageID or the messageNumber
* @return the message or null if it does not exist
+ * @throws IOException
*/
public String getMessageReference(MessageId identity) throws IOException;
/**
* Removes a message from the message store.
- * @param context TODO
- * @param ack the ack request that cause the message to be removed. It conatins
- * the identity which contains the messageID of the message that needs to be removed.
+ *
+ * @param context
+ * @param ack the ack request that cause the message to be removed. It conatins the identity which contains the
+ * messageID of the message that needs to be removed.
+ * @throws IOException
*/
- public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
+ public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException;
/**
* Removes all the messages from the message store.
- * @param context TODO
+ *
+ * @param context
+ * @throws IOException
*/
public void removeAllMessages(ConnectionContext context) throws IOException;
-
+
/**
* Recover any messages to be delivered.
- *
+ *
* @param container
- * @throws Exception
+ * @throws Exception
*/
public void recover(MessageRecoveryListener container) throws Exception;
/**
* The destination that the message store is holding messages for.
- * @return
+ *
+ * @return the destination
*/
public ActiveMQDestination getDestination();
@@ -100,4 +110,23 @@
*/
public void setUsageManager(UsageManager usageManager);
+ /**
+ * @return the number of messages ready to deliver
+ * @throws IOException
+ *
+ */
+ public int getMessageCount() throws IOException;
+
+ /**
+ * A hint to the Store to reset any batching state for the Destination
+ *
+ * @param nextToDispatch
+ *
+ */
+ public void resetBatching();
+
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener)
+ throws Exception;
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Fri Nov 17 02:33:57 2006
@@ -104,6 +104,4 @@
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager);
-
-
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Fri Nov 17 02:33:57 2006
@@ -77,4 +77,20 @@
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
}
+
+
+ public int getMessageCount() throws IOException{
+ return delegate.getMessageCount();
+ }
+
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ delegate.recoverNextMessages(maxReturned,listener);
+
+ }
+
+ public void resetBatching(){
+ delegate.resetBatching();
+
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -82,22 +82,15 @@
delegate.recoverSubscription(clientId, subscriptionName, listener);
}
- public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
- delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
+ delegate.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
}
- public void resetBatching(String clientId,String subscriptionName,MessageId id) {
- delegate.resetBatching(clientId,subscriptionName,id);
- }
-
- public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
- return delegate.getNextMessageIdToDeliver(clientId,subscriptionName,id);
- }
-
- public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
- return delegate.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
+ public void resetBatching(String clientId,String subscriptionName) {
+ delegate.resetBatching(clientId,subscriptionName);
}
+
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}
@@ -120,4 +113,19 @@
public int getMessageCount(String clientId,String subscriberName) throws IOException{
return delegate.getMessageCount(clientId,subscriberName);
}
+
+
+ public int getMessageCount() throws IOException{
+ return delegate.getMessageCount();
+ }
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ delegate.recoverNextMessages(maxReturned,listener);
+
+ }
+
+ public void resetBatching(){
+ delegate.resetBatching();
+
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -69,46 +69,21 @@
*
* @param clientId
* @param subscriptionName
- * @param lastMessageId
* @param maxReturned
* @param listener
*
* @throws Exception
*/
- public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception;
/**
* A hint to the Store to reset any batching state for a durable subsriber
* @param clientId
* @param subscriptionName
- * @param nextToDispatch
*
*/
- public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch);
-
- /**
- * Get the next messageId to deliver to a subscriber after the MessageId provided
- * @param clientId
- * @param subscriptionName
- * @param id
- * @return the next messageId or null
- * @throws IOException
- * @throws Exception
- */
- public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
-
-
- /**
- * Get the previous messageId to deliver to a subscriber before the MessageId provided
- * @param clientId
- * @param subscriptionName
- * @param id
- * @return the next messageId or null
- * @throws IOException
- * @throws Exception
- */
- public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
+ public void resetBatching(String clientId,String subscriptionName);
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Fri Nov 17 02:33:57 2006
@@ -89,4 +89,9 @@
public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
-}
+
+ public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
+
+ public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,
+ JDBCMessageRecoveryListener listener) throws Exception;
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Nov 17 02:33:57 2006
@@ -19,7 +19,7 @@
import java.io.IOException;
import java.sql.SQLException;
-
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -33,6 +33,7 @@
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
+
/**
* @version $Revision: 1.10 $
*/
@@ -42,6 +43,7 @@
protected final ActiveMQDestination destination;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
+ protected AtomicLong lastMessageId = new AtomicLong(-1);
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
ActiveMQDestination destination) {
@@ -201,4 +203,67 @@
public void setUsageManager(UsageManager usageManager) {
// we can ignore since we don't buffer up messages.
}
+
+
+ public int getMessageCount() throws IOException{
+ int result = 0;
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+ try {
+
+ result = adapter.doGetMessageCount(c, destination);
+
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
+ } finally {
+ c.close();
+ }
+ return result;
+ }
+
+ /**
+ * @param maxReturned
+ * @param listener
+ * @throws Exception
+ * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, org.apache.activemq.store.MessageRecoveryListener)
+ */
+ public void recoverNextMessages(int maxReturned,final MessageRecoveryListener listener) throws Exception{
+ TransactionContext c=persistenceAdapter.getTransactionContext();
+
+ try{
+ adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
+ new JDBCMessageRecoveryListener(){
+
+ public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+ Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ listener.recoverMessage(msg);
+ lastMessageId.set(sequenceId);
+ }
+
+ public void recoverMessageReference(String reference) throws Exception{
+ listener.recoverMessageReference(reference);
+ }
+
+ public void finished(){
+ listener.finished();
+ }
+ });
+ }catch(SQLException e){
+ JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ }finally{
+ c.close();
+ }
+
+ }
+
+ /**
+ *
+ * @see org.apache.activemq.store.MessageStore#resetBatching()
+ */
+ public void resetBatching(){
+ lastMessageId.set(-1);
+
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -19,7 +19,9 @@
import java.io.IOException;
import java.sql.SQLException;
-
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@@ -30,13 +32,14 @@
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
-import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* @version $Revision: 1.6 $
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
+ private Map subscriberLastMessageMap=new ConcurrentHashMap();
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
ActiveMQTopic topic) {
super(persistenceAdapter, adapter, wireFormat, topic);
@@ -90,35 +93,46 @@
}
}
- public void recoverNextMessages(final String clientId,final String subscriptionName, final MessageId lastMessageId,final int maxReturned,final MessageRecoveryListener listener) throws Exception{
- TransactionContext c = persistenceAdapter.getTransactionContext();
- try {
- long lastSequence = lastMessageId != null ? lastMessageId.getBrokerSequenceId() : -1;
- adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,lastSequence,maxReturned,
- new JDBCMessageRecoveryListener() {
- public void recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ public synchronized void recoverNextMessages(final String clientId,final String subscriptionName,
+ final int maxReturned,final MessageRecoveryListener listener) throws Exception{
+ TransactionContext c=persistenceAdapter.getTransactionContext();
+ String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+ AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId);
+ if(last==null){
+ last=new AtomicLong(-1);
+ subscriberLastMessageMap.put(subcriberId,last);
+ }
+ final AtomicLong finalLast=last;
+ try{
+ adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
+ new JDBCMessageRecoveryListener(){
+
+ public void recoverMessage(long sequenceId,byte[] data) throws Exception{
+ Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
+ finalLast.set(sequenceId);
}
- public void recoverMessageReference(String reference) throws Exception {
+
+ public void recoverMessageReference(String reference) throws Exception{
listener.recoverMessageReference(reference);
}
-
+
public void finished(){
listener.finished();
}
});
- } catch (SQLException e) {
+ }catch(SQLException e){
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-
- } finally {
+ }finally{
c.close();
+ last.set(finalLast.get());
}
-
}
- public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+ public void resetBatching(String clientId,String subscriptionName) {
+ String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+ subscriberLastMessageMap.remove(subcriberId);
}
/**
@@ -165,6 +179,7 @@
throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
+ resetBatching(clientId,subscriptionName);
}
}
@@ -180,76 +195,9 @@
}
}
- public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
-
- final MessageId result = new MessageId();
- final AtomicBoolean initalized = new AtomicBoolean();
- TransactionContext c = persistenceAdapter.getTransactionContext();
- try {
- long sequence = id != null ? id.getBrokerSequenceId() : -1;
- adapter.doGetNextDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
- public void recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
- initalized.set(true);
-
- }
- public void recoverMessageReference(String reference) throws Exception {
- result.setValue(reference);
- initalized.set(true);
-
- }
-
- public void finished(){
- }
- });
-
-
- } catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
- } finally {
- c.close();
- }
- return initalized.get () ? result : null;
- }
- public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
- final MessageId result = new MessageId();
- final AtomicBoolean initalized = new AtomicBoolean();
- TransactionContext c = persistenceAdapter.getTransactionContext();
- try {
- long sequence = id != null ? id.getBrokerSequenceId() : -1;
- adapter.doGetPrevDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
- public void recoverMessage(long sequenceId, byte[] data) throws Exception {
- Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- result.setProducerId(msg.getMessageId().getProducerId());
- result.setProducerSequenceId(msg.getMessageId().getProducerSequenceId());
- result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
- initalized.set(true);
-
- }
- public void recoverMessageReference(String reference) throws Exception {
- result.setValue(reference);
- initalized.set(true);
-
- }
-
- public void finished(){
- }
- });
-
-
- } catch (SQLException e) {
- JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
- } finally {
- c.close();
- }
- return initalized.get () ? result : null;
- }
+
+
public int getMessageCount(String clientId,String subscriberName) throws IOException{
int result = 0;
@@ -263,6 +211,12 @@
} finally {
c.close();
}
+ return result;
+ }
+
+ protected String getSubscriptionKey(String clientId,String subscriberName){
+ String result=clientId+":";
+ result+=subscriberName!=null?subscriberName:"NOT_SET";
return result;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Fri Nov 17 02:33:57 2006
@@ -66,6 +66,8 @@
private String durableSubscriberMessageCountStatement;
private String nextDurableSubscriberMessageIdStatement;
private String prevDurableSubscriberMessageIdStatement;
+ private String destinationMessageCountStatement;
+ private String findNextMessagesStatement;
private boolean useLockCreateWhereClause;
public String[] getCreateSchemaStatements() {
@@ -338,6 +340,29 @@
}
return lockUpdateStatement;
}
+
+ /**
+ * @return the destinationMessageCountStatement
+ */
+ public String getDestinationMessageCountStatement(){
+ if (destinationMessageCountStatement==null) {
+ destinationMessageCountStatement= "SELECT COUNT(*) FROM " + getFullMessageTableName()
+ + " WHERE CONTAINER=?";
+ }
+ return destinationMessageCountStatement;
+ }
+
+ /**
+ * @return the findNextMessagesStatement
+ */
+ public String getFindNextMessagesStatement(){
+ if(findNextMessagesStatement == null) {
+ findNextMessagesStatement="SELECT ID, MSG FROM " + getFullMessageTableName()
+ + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+ }
+ return findNextMessagesStatement;
+ }
+
public String getFullMessageTableName() {
return getTablePrefix() + getMessageTableName();
@@ -627,4 +652,22 @@
public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
}
-}
+
+ /**
+ * @param findNextMessagesStatement the findNextMessagesStatement to set
+ */
+ public void setFindNextMessagesStatement(String findNextMessagesStatement){
+ this.findNextMessagesStatement=findNextMessagesStatement;
+ }
+
+ /**
+ * @param destinationMessageCountStatement the destinationMessageCountStatement to set
+ */
+ public void setDestinationMessageCountStatement(String destinationMessageCountStatement){
+ this.destinationMessageCountStatement=destinationMessageCountStatement;
+ }
+
+
+
+
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Nov 17 02:33:57 2006
@@ -677,6 +677,54 @@
close(s);
}
}
+
+
+ public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException, IOException{
+ PreparedStatement s=null;
+ ResultSet rs=null;
+ int result=0;
+ try{
+ s=c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
+ s.setString(1,destination.getQualifiedName());
+ rs=s.executeQuery();
+ if(rs.next()){
+ result=rs.getInt(1);
+ }
+ }finally{
+ close(rs);
+ close(s);
+ }
+ return result;
+ }
+
+
+ public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
+ PreparedStatement s=null;
+ ResultSet rs=null;
+ try{
+ s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
+ s.setString(1,destination.getQualifiedName());
+ s.setLong(4,nextSeq);
+ rs=s.executeQuery();
+ int count=0;
+ if(statements.isUseExternalMessageReferences()){
+ while(rs.next()&&count<maxReturned){
+ listener.recoverMessageReference(rs.getString(1));
+ count++;
+ }
+ }else{
+ while(rs.next()&&count<maxReturned){
+ listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+ count++;
+ }
+ }
+ }finally{
+ close(rs);
+ close(s);
+ listener.finished();
+ }
+
+ }
/*
* Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
* subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
@@ -700,4 +748,6 @@
* out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
* try { s.close(); } catch (Throwable ignore) {} } }
*/
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Fri Nov 17 02:33:57 2006
@@ -381,4 +381,27 @@
throw new IOException("The journal does not support message references.");
}
-}
+ /**
+ * @return
+ * @throws IOException
+ * @see org.apache.activemq.store.MessageStore#getMessageCount()
+ */
+ public int getMessageCount() throws IOException{
+ peristenceAdapter.checkpoint(true, true);
+ return longTermStore.getMessageCount();
+ }
+
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ peristenceAdapter.checkpoint(true, true);
+ longTermStore.recoverNextMessages(maxReturned,listener);
+
+ }
+
+
+ public void resetBatching(){
+ longTermStore.resetBatching();
+
+ }
+
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -58,9 +58,9 @@
longTermStore.recoverSubscription(clientId, subscriptionName, listener);
}
- public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
- longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+ longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
}
@@ -190,25 +190,16 @@
return longTermStore.getAllSubscriptions();
}
- public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
- this.peristenceAdapter.checkpoint(true, true);
- return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
- }
- public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
- this.peristenceAdapter.checkpoint(true, true);
- return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
- }
-
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
- public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch) {
- longTermStore.resetBatching(clientId,subscriptionName,nextToDispatch);
+ public void resetBatching(String clientId,String subscriptionName) {
+ longTermStore.resetBatching(clientId,subscriptionName);
}
-}
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java Fri Nov 17 02:33:57 2006
@@ -415,4 +415,23 @@
throw new IOException("The journal does not support message references.");
}
-}
+
+ public int getMessageCount() throws IOException{
+ peristenceAdapter.checkpoint(true, true);
+ return longTermStore.getMessageCount();
+ }
+
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ peristenceAdapter.checkpoint(true, true);
+ longTermStore.recoverNextMessages(maxReturned,listener);
+
+ }
+
+
+ public void resetBatching(){
+ longTermStore.resetBatching();
+
+ }
+
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -72,9 +72,9 @@
}
- public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId, int maxReturned,final MessageRecoveryListener listener) throws Exception{
+ public void recoverNextMessages(String clientId,String subscriptionName, int maxReturned,final MessageRecoveryListener listener) throws Exception{
this.peristenceAdapter.checkpoint(true, true);
- longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,new MessageRecoveryListener() {
+ longTermStore.recoverNextMessages(clientId, subscriptionName,maxReturned,new MessageRecoveryListener() {
public void recoverMessage(Message message) throws Exception {
throw new IOException("Should not get called.");
}
@@ -217,26 +217,16 @@
return longTermStore.getAllSubscriptions();
}
- public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
- this.peristenceAdapter.checkpoint(true, true);
- return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
- }
-
- public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
- this.peristenceAdapter.checkpoint(true, true);
- return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
- }
-
-
+
public int getMessageCount(String clientId,String subscriberName) throws IOException{
this.peristenceAdapter.checkpoint(true, true);
return longTermStore.getMessageCount(clientId,subscriberName);
}
- public void resetBatching(String clientId,String subscriptionName,MessageId nextId) {
- longTermStore.resetBatching(clientId,subscriptionName,nextId);
+ public void resetBatching(String clientId,String subscriptionName) {
+ longTermStore.resetBatching(clientId,subscriptionName);
}
-}
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Fri Nov 17 02:33:57 2006
@@ -26,6 +26,7 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
@@ -35,10 +36,12 @@
*
* @version $Revision: 1.7 $
*/
-public class KahaMessageStore implements MessageStore{
+public class KahaMessageStore implements MessageStore, UsageListener{
protected final ActiveMQDestination destination;
protected final ListContainer messageContainer;
+ protected StoreEntry batchEntry = null;
protected final LRUCache cache;
+ protected UsageManager usageManager;
public KahaMessageStore(ListContainer container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{
this.messageContainer=container;
@@ -73,19 +76,19 @@
public synchronized Message getMessage(MessageId identity) throws IOException{
Message result=null;
- StoreEntry entry=(StoreEntry)cache.remove(identity);
+ StoreEntry entry=(StoreEntry)cache.get(identity);
if(entry!=null){
result = (Message)messageContainer.get(entry);
- }else{
-
- for(Iterator i=messageContainer.iterator();i.hasNext();){
- Message msg=(Message)i.next();
- if(msg.getMessageId().equals(identity)){
- result=msg;
- break;
+ }else{
+ for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+ Message msg=(Message)messageContainer.get(entry);
+ if(msg.getMessageId().equals(identity)){
+ result=msg;
+ cache.put(identity,msg);
+ break;
+ }
}
}
- }
return result;
}
@@ -102,10 +105,10 @@
if(entry!=null){
messageContainer.remove(entry);
}else{
- for(Iterator i=messageContainer.iterator();i.hasNext();){
- Message msg=(Message)i.next();
+ for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
+ Message msg=(Message)messageContainer.get(entry);
if(msg.getMessageId().equals(msgId)){
- i.remove();
+ messageContainer.remove(entry);
break;
}
}
@@ -119,9 +122,15 @@
listener.finished();
}
- public void start() {}
+ public void start() {
+ if( this.usageManager != null )
+ this.usageManager.addUsageListener(this);
+ }
- public void stop() {}
+ public void stop() {
+ if( this.usageManager != null )
+ this.usageManager.removeUsageListener(this);
+ }
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
@@ -141,6 +150,91 @@
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
+ this.usageManager = usageManager;
+ }
+
+ /**
+ * @return the number of messages held by this destination
+ * @see org.apache.activemq.store.MessageStore#getMessageCount()
+ */
+ public int getMessageCount(){
+ return messageContainer.size();
+ }
+
+ /**
+ * @param id
+ * @return null
+ * @throws Exception
+ * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
+ */
+ public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception{
+ return null;
+ }
+
+ /**
+ * @param lastMessageId
+ * @param maxReturned
+ * @param listener
+ * @throws Exception
+ * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener)
+ */
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ StoreEntry entry = batchEntry;
+ if (entry == null) {
+ entry= messageContainer.getFirst();
+ }else {
+ entry=messageContainer.refresh(entry);
+ entry=messageContainer.getNext(entry);
+ }
+ if(entry!=null){
+ int count = 0;
+ do{
+ Object msg=messageContainer.get(entry);
+ if(msg!=null){
+ if(msg.getClass()==String.class){
+ String ref=msg.toString();
+ listener.recoverMessageReference(ref);
+ }else{
+ Message message=(Message)msg;
+ listener.recoverMessage(message);
+ }
+ count++;
+ }
+ batchEntry = entry;
+ entry=messageContainer.getNext(entry);
+ }while(entry!=null&&count<maxReturned);
+ }
+ listener.finished();
+
+ }
+
+ /**
+ * @param nextToDispatch
+ * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
+ */
+ public void resetBatching(){
+ batchEntry = null;
+
+ }
+
+ /**
+ * @return true if the store supports cursors
+ */
+ public boolean isSupportForCursors() {
+ return true;
+ }
+
+ /**
+ * @param memoryManager
+ * @param oldPercentUsage
+ * @param newPercentUsage
+ * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int)
+ */
+ public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+ if (newPercentUsage == 100) {
+ cache.clear();
+ }
+
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Fri Nov 17 02:33:57 2006
@@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +38,7 @@
import org.apache.activemq.store.TransactionStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
+
/**
* @org.apache.xbean.XBean
@@ -106,7 +107,7 @@
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
- rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
+ rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -31,24 +32,20 @@
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
-import java.util.concurrent.ConcurrentHashMap;
/**
* @version $Revision: 1.5 $
*/
-public class KahaTopicMessageStore implements TopicMessageStore{
+public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
- private ActiveMQDestination destination;
private ListContainer ackContainer;
- private ListContainer messageContainer;
private Map subscriberContainer;
private Store store;
private Map subscriberMessages=new ConcurrentHashMap();
public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer,
- MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
- this.messageContainer=messageContainer;
- this.destination=destination;
+ MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
+ super(messageContainer,destination,maximumCacheSize);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
@@ -159,7 +156,7 @@
}
}
- public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
@@ -195,7 +192,7 @@
}
public void delete(){
- messageContainer.clear();
+ super.delete();
ackContainer.clear();
subscriberContainer.clear();
}
@@ -322,82 +319,16 @@
}
}
- /**
- * @param usageManager
- * @see org.apache.activemq.store.MessageStore#setUsageManager(org.apache.activemq.memory.UsageManager)
- */
- public void setUsageManager(UsageManager usageManager){
- // TODO Auto-generated method stub
- }
-
- /**
- * @throws Exception
- * @see org.apache.activemq.Service#start()
- */
- public void start() throws Exception{
- // TODO Auto-generated method stub
- }
-
- /**
- * @throws Exception
- * @see org.apache.activemq.Service#stop()
- */
- public void stop() throws Exception{
- // TODO Auto-generated method stub
- }
-
- /**
- * @param clientId
- * @param subscriptionName
- * @see org.apache.activemq.store.TopicMessageStore#resetBatching(java.lang.String, java.lang.String)
- */
- public synchronized void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch){
+ public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
if(topicSubContainer!=null){
topicSubContainer.reset();
- if(nextToDispatch!=null){
- StoreEntry entry=topicSubContainer.getListContainer().getFirst();
- do{
- ConsumerMessageRef consumerRef=(ConsumerMessageRef)topicSubContainer.getListContainer().get(entry);
- Object msg=messageContainer.get(consumerRef.getMessageEntry());
- if(msg!=null){
- if(msg.getClass()==String.class){
- String ref=msg.toString();
- if(msg.toString().equals(nextToDispatch.toString())){
- // need to set the entry to the previous one
- // to ensure we start in the right place
- topicSubContainer
- .setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
- break;
- }
- }else{
- Message message=(Message)msg;
- if(message!=null&&message.getMessageId().equals(nextToDispatch)){
- // need to set the entry to the previous one
- // to ensure we start in the right place
- topicSubContainer
- .setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
- break;
- }
- }
- }
- entry=topicSubContainer.getListContainer().getNext(entry);
- }while(entry!=null);
- }
}
}
- /**
- * @param clientId
- * @param subscriptionName
- * @param id
- * @return next messageId
- * @throws IOException
- * @see org.apache.activemq.store.TopicMessageStore#getNextMessageIdToDeliver(java.lang.String, java.lang.String,
- * org.apache.activemq.command.MessageId)
- */
- public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
+
+ public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName) throws IOException{
// TODO Auto-generated method stub
return null;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Fri Nov 17 02:33:57 2006
@@ -1,20 +1,17 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.store.memory;
import java.io.IOException;
@@ -22,7 +19,6 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -34,82 +30,97 @@
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
- *
+ *
* @version $Revision: 1.7 $
*/
-public class MemoryMessageStore implements MessageStore {
+public class MemoryMessageStore implements MessageStore{
protected final ActiveMQDestination destination;
protected final Map messageTable;
- public MemoryMessageStore(ActiveMQDestination destination) {
- this(destination, new LinkedHashMap());
+ public MemoryMessageStore(ActiveMQDestination destination){
+ this(destination,new LinkedHashMap());
}
- public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
- this.destination = destination;
- this.messageTable = Collections.synchronizedMap(messageTable);
+ public MemoryMessageStore(ActiveMQDestination destination,Map messageTable){
+ this.destination=destination;
+ this.messageTable=Collections.synchronizedMap(messageTable);
}
- public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
- messageTable.put(message.getMessageId(), message);
+ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+ messageTable.put(message.getMessageId(),message);
}
- public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
- messageTable.put(messageId, messageRef);
+
+ public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+ throws IOException{
+ messageTable.put(messageId,messageRef);
}
- public Message getMessage(MessageId identity) throws IOException {
- return (Message) messageTable.get(identity);
+ public Message getMessage(MessageId identity) throws IOException{
+ return (Message)messageTable.get(identity);
}
- public String getMessageReference(MessageId identity) throws IOException {
- return (String) messageTable.get(identity);
+
+ public String getMessageReference(MessageId identity) throws IOException{
+ return (String)messageTable.get(identity);
}
- public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
messageTable.remove(ack.getLastMessageId());
}
-
- public void removeMessage(MessageId msgId) throws IOException {
+
+ public void removeMessage(MessageId msgId) throws IOException{
messageTable.remove(msgId);
}
- public void recover(MessageRecoveryListener listener) throws Exception {
+ public void recover(MessageRecoveryListener listener) throws Exception{
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
- Object msg=(Object) iter.next();
+ Object msg=(Object)iter.next();
if(msg.getClass()==String.class){
- listener.recoverMessageReference((String) msg);
+ listener.recoverMessageReference((String)msg);
}else{
- listener.recoverMessage((Message) msg);
+ listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
}
- public void start() {
+ public void start(){
}
- public void stop() {
+ public void stop(){
}
- public void removeAllMessages(ConnectionContext context) throws IOException {
+ public void removeAllMessages(ConnectionContext context) throws IOException{
messageTable.clear();
}
- public ActiveMQDestination getDestination() {
+ public ActiveMQDestination getDestination(){
return destination;
}
- public void delete() {
+ public void delete(){
messageTable.clear();
}
-
+
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
- public void setUsageManager(UsageManager usageManager) {
+ public void setUsageManager(UsageManager usageManager){
}
+ public int getMessageCount(){
+ return messageTable.size();
+ }
+
+ public void resetBatching(MessageId nextToDispatch){
+ }
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ }
+
+ public void resetBatching(){
+ }
}