You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/09 15:05:22 UTC
svn commit: r454368 [1/3] - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/kaha/ main...
Author: rajdavies
Date: Mon Oct 9 06:05:20 2006
New Revision: 454368
URL: http://svn.apache.org/viewvc?view=rev&rev=454368
Log:
changes for https://issues.apache.org/activemq/browse/AMQ-845 -
provide support for durable topic cursors
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Oct 9 06:05:20 2006
@@ -454,6 +454,7 @@
if (broker != null) {
stopper.stop(broker);
}
+ tempDataStore.close();
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
@@ -957,7 +958,7 @@
/**
* @return the tempDataStore
*/
- public Store getTempDataStore() {
+ public synchronized Store getTempDataStore() {
if (tempDataStore == null){
String name = getTmpDataDirectory().getPath();
try {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Oct 9 06:05:20 2006
@@ -41,9 +41,9 @@
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
- //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName()));
- // super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
- super(broker,context,info);
+ //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
+ //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
+ super(broker,context,info);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Oct 9 06:05:20 2006
@@ -124,7 +124,8 @@
synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++;
- if(!isFull()){
+ //if(!isFull()){
+ if(!isFull() && pending.isEmpty() && canDispatch(node)){
dispatch(node);
}else{
optimizePrefetch();
@@ -196,8 +197,6 @@
}
dispatchMatched();
return;
- }else{
- // System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
}
}
}
@@ -435,8 +434,7 @@
/**
* @param node
* @param message
- * TODO
- * @return
+ * @return MessageDispatch
*/
protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
if( node == QueueMessageReference.NULL_MESSAGE ) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Oct 9 06:05:20 2006
@@ -1,43 +1,96 @@
/**
*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
+
package org.apache.activemq.broker.region.cursors;
+import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
-
+import org.apache.activemq.broker.region.MessageReference;
/**
- * Default method holder for pending message (messages awaiting disptach to a consumer) cursor
+ * Abstract method holder for pending message (messages awaiting disptach to a
+ * consumer) cursor
*
* @version $Revision$
*/
-public abstract class AbstractPendingMessageCursor implements PendingMessageCursor{
-
+public class AbstractPendingMessageCursor implements PendingMessageCursor {
+ protected int maxBatchSize = 100;
+
public void start() throws Exception{
}
-
+
public void stop() throws Exception{
}
-
- public void add(ConnectionContext context, Destination destination) throws Exception{
+
+ public void add(ConnectionContext context,Destination destination)
+ throws Exception{
}
- public void remove(ConnectionContext context, Destination destination) throws Exception{
+ public void remove(ConnectionContext context,Destination destination)
+ throws Exception{
}
-
-
+
public boolean isRecoveryRequired(){
return true;
}
+
+ public void addMessageFirst(MessageReference node) throws Exception{
+ }
+
+ public void addMessageLast(MessageReference node) throws Exception{
+ }
+
+ public void clear(){
+ }
+
+ public boolean hasNext(){
+ return false;
+ }
+
+ public boolean isEmpty(){
+ return false;
+ }
+
+ public MessageReference next(){
+ return null;
+ }
+
+ public void remove(){
+ }
+
+ public void reset(){
+ }
+
+ public int size(){
+ return 0;
+ }
+
+ public int getMaxBatchSize(){
+ return maxBatchSize;
+ }
+
+ public void setMaxBatchSize(int maxBatchSize){
+ this.maxBatchSize=maxBatchSize;
+ }
+
+ protected void fillBatch() throws Exception{
+ }
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Oct 9 06:05:20 2006
@@ -13,6 +13,8 @@
*/
package org.apache.activemq.broker.region.cursors;
+import java.io.IOException;
+
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@@ -55,14 +57,17 @@
/**
* add message to await dispatch
* @param node
+ * @throws IOException
+ * @throws Exception
*/
- public void addMessageLast(MessageReference node);
+ public void addMessageLast(MessageReference node) throws Exception;
/**
* add message to await dispatch
* @param node
+ * @throws Exception
*/
- public void addMessageFirst(MessageReference node);
+ public void addMessageFirst(MessageReference node) throws Exception;
/**
* @return true if there pending messages to dispatch
@@ -94,8 +99,18 @@
/**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
- * @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
public boolean isRecoveryRequired();
+
+ /**
+ * @return the maximum batch size
+ */
+ public int getMaxBatchSize();
+
+ /**
+ * Set the max batch size
+ * @param maxBatchSize
+ */
+ public void setMaxBatchSize(int maxBatchSize);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct 9 06:05:20 2006
@@ -11,6 +11,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
@@ -22,24 +23,28 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
+
static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class);
private int pendingCount=0;
private String clientId;
private String subscriberName;
- private int maxBatchSize=10;
- private LinkedList batchList=new LinkedList();
private Map topics=new HashMap();
private LinkedList storePrefetches=new LinkedList();
- private AtomicBoolean started=new AtomicBoolean();
+ private boolean started;
+ private PendingMessageCursor nonPersistent;
+ private PendingMessageCursor currentCursor;
/**
* @param topic
@@ -47,24 +52,26 @@
* @param subscriberName
* @throws IOException
*/
- public StoreDurableSubscriberCursor(String clientId,String subscriberName){
+ public StoreDurableSubscriberCursor(String clientId,String subscriberName,Store store,int maxBatchSize){
this.clientId=clientId;
this.subscriberName=subscriberName;
+ this.nonPersistent=new FilePendingMessageCursor(clientId+subscriberName,store);
+ storePrefetches.add(nonPersistent);
}
public synchronized void start() throws Exception{
- started.set(true);
+ started=true;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
- TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+ PendingMessageCursor tsp=(PendingMessageCursor)i.next();
tsp.start();
pendingCount+=tsp.size();
}
}
public synchronized void stop() throws Exception{
- started.set(false);
+ started=false;
for(Iterator i=storePrefetches.iterator();i.hasNext();){
- TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+ PendingMessageCursor tsp=(PendingMessageCursor)i.next();
tsp.stop();
}
pendingCount=0;
@@ -78,10 +85,11 @@
* @throws Exception
*/
public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
- TopicStorePrefetch tsp=new TopicStorePrefetch((Topic) destination,batchList,clientId,subscriberName);
+ TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
+ tsp.setMaxBatchSize(getMaxBatchSize());
topics.put(destination,tsp);
storePrefetches.add(tsp);
- if(started.get()){
+ if(started){
tsp.start();
pendingCount+=tsp.size();
}
@@ -95,7 +103,7 @@
* @throws Exception
*/
public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{
- TopicStorePrefetch tsp=(TopicStorePrefetch) topics.remove(destination);
+ Object tsp=topics.remove(destination);
if(tsp!=null){
storePrefetches.remove(tsp);
}
@@ -119,12 +127,32 @@
return false;
}
- public synchronized void addMessageFirst(MessageReference node){
- pendingCount++;
+ public synchronized void addMessageFirst(MessageReference node) throws IOException{
+ if(started){
+ throw new RuntimeException("This shouldn't be called!");
+ }
}
- public synchronized void addMessageLast(MessageReference node){
- pendingCount++;
+ public synchronized void addMessageLast(MessageReference node) throws Exception{
+ if(started){
+ if(node!=null){
+ Message msg=node.getMessage();
+ if(!msg.isPersistent()){
+ nonPersistent.addMessageLast(node);
+ }else{
+ Destination dest=msg.getRegionDestination();
+ TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
+ if(tsp!=null){
+ tsp.addMessageLast(node);
+ // if the store has been empty - then this message is next to dispatch
+ if((pendingCount-nonPersistent.size())<=0){
+ tsp.nextToDispatch(node.getMessageId());
+ }
+ }
+ }
+ pendingCount++;
+ }
+ }
}
public void clear(){
@@ -132,49 +160,56 @@
}
public synchronized boolean hasNext(){
- return !isEmpty();
- }
-
- public synchronized MessageReference next(){
- MessageReference result=null;
- if(!isEmpty()){
- if(batchList.isEmpty()){
- try{
- fillBatch();
- }catch(Exception e){
- log.error("Couldn't fill batch from store ",e);
- throw new RuntimeException(e);
- }
- }
- if(!batchList.isEmpty()){
- result=(MessageReference) batchList.removeFirst();
+ boolean result=pendingCount>0;
+ if(result){
+ try{
+ currentCursor=getNextCursor();
+ }catch(Exception e){
+ log.error("Failed to get current cursor ",e);
+ throw new RuntimeException(e);
}
+ result=currentCursor!=null?currentCursor.hasNext():false;
}
return result;
}
+ public synchronized MessageReference next(){
+ return currentCursor!=null?currentCursor.next():null;
+ }
+
public synchronized void remove(){
+ if(currentCursor!=null){
+ currentCursor.remove();
+ }
pendingCount--;
}
- public void reset(){
- batchList.clear();
+ public synchronized void reset(){
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+ tsp.reset();
+ }
}
public int size(){
return pendingCount;
}
- private synchronized void fillBatch() throws Exception{
- for(Iterator i=storePrefetches.iterator();i.hasNext();){
- TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
- tsp.fillBatch();
- if(batchList.size()>=maxBatchSize){
- break;
+ protected synchronized PendingMessageCursor getNextCursor() throws Exception{
+ if(currentCursor==null||currentCursor.isEmpty()){
+ currentCursor=null;
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+ tsp.setMaxBatchSize(getMaxBatchSize());
+ if(tsp.hasNext()){
+ currentCursor=tsp;
+ break;
+ }
}
+ // round-robin
+ Object obj=storePrefetches.removeFirst();
+ storePrefetches.addLast(obj);
}
- // round-robin
- Object obj=storePrefetches.removeFirst();
- storePrefetches.addLast(obj);
+ return currentCursor;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct 9 06:05:20 2006
@@ -1,20 +1,27 @@
/**
*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
+
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
+import javax.jms.JMSException;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@@ -23,134 +30,114 @@
import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
/**
- * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ * perist pending messages pending message (messages awaiting disptach to a
+ * consumer) cursor
*
* @version $Revision$
*/
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
+class TopicStorePrefetch extends AbstractPendingMessageCursor implements
+ MessageRecoveryListener {
+
static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
- private Topic topic;
+
private TopicMessageStore store;
- private LinkedList batchList;
+ private final LinkedList batchList=new LinkedList();
private String clientId;
private String subscriberName;
- private int pendingCount=0;
private MessageId lastMessageId;
- private int maxBatchSize=10;
+ private Destination regionDestination;
/**
* @param topic
- * @param batchList
* @param clientId
* @param subscriberName
* @throws IOException
*/
- public TopicStorePrefetch(Topic topic,LinkedList batchList,String clientId,String subscriberName){
- this.topic=topic;
- this.store=(TopicMessageStore) topic.getMessageStore();
- this.batchList=batchList;
+ public TopicStorePrefetch(Topic topic,String clientId,String subscriberName){
+ this.regionDestination = topic;
+ this.store=(TopicMessageStore)topic.getMessageStore();
this.clientId=clientId;
this.subscriberName=subscriberName;
}
public void start() throws Exception{
- pendingCount=store.getMessageCount(clientId,subscriberName);
- System.err.println("Pending count = "+pendingCount);
}
public void stop() throws Exception{
- pendingCount=0;
- lastMessageId=null;
+ store.resetBatching(clientId,clientId,null);
}
/**
* @return true if there are no pending messages
*/
public boolean isEmpty(){
- return pendingCount==0;
- }
-
- /**
- * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
- * may do
- *
- * @see org.apache.activemq.region.cursors.PendingMessageCursor
- * @return true if recovery required
- */
- public boolean isRecoveryRequired(){
- return false;
+ return batchList.isEmpty();
}
-
- public synchronized void addMessageFirst(MessageReference node){
- pendingCount++;
- }
-
- public synchronized void addMessageLast(MessageReference node){
- pendingCount++;
- }
-
- public void clear(){
- pendingCount=0;
- lastMessageId=null;
+
+ public synchronized int size(){
+ try{
+ return store.getMessageCount(clientId,subscriberName);
+ }catch(IOException e){
+ log.error(this + " Failed to get the outstanding message count from the store",e);
+ throw new RuntimeException(e);
+ }
}
public synchronized boolean hasNext(){
+ if(isEmpty()){
+ try{
+ fillBatch();
+ }catch(Exception e){
+ log.error("Failed to fill batch",e);
+ throw new RuntimeException(e);
+ }
+ }
return !isEmpty();
}
public synchronized MessageReference next(){
- MessageReference result=null;
- if(!isEmpty()){
- if(batchList.isEmpty()){
- try{
- fillBatch();
- }catch(Exception e){
- log.error(topic.getDestination()+" Couldn't fill batch from store ",e);
- throw new RuntimeException(e);
- }
- }
- result=(MessageReference) batchList.removeFirst();
- }
+ Message result = (Message)batchList.removeFirst();
+ result.setRegionDestination(regionDestination);
return result;
}
- public synchronized void remove(){
- pendingCount--;
- }
-
public void reset(){
- batchList.clear();
- }
-
- public int size(){
- return pendingCount;
}
// MessageRecoveryListener implementation
- public void finished(){}
+ public void finished(){
+ }
public void recoverMessage(Message message) throws Exception{
+ message.setRegionDestination(regionDestination);
batchList.addLast(message);
}
- public void recoverMessageReference(String messageReference) throws Exception{
+ public void recoverMessageReference(String messageReference)
+ throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
}
// implementation
protected void fillBatch() throws Exception{
- if(pendingCount<=0){
- pendingCount=store.getMessageCount(clientId,subscriberName);
- }
- if(pendingCount>0){
- store.recoverNextMessages(clientId,subscriberName,lastMessageId,maxBatchSize,this);
- // this will add more messages to the batch list
- if(!batchList.isEmpty()){
- Message message=(Message) batchList.getLast();
- lastMessageId=message.getMessageId();
- }
+ store.recoverNextMessages(clientId,subscriberName,lastMessageId,
+ maxBatchSize,this);
+ // this will add more messages to the batch list
+ if(!batchList.isEmpty()){
+ Message message=(Message)batchList.getLast();
+ lastMessageId=message.getMessageId();
}
+ }
+
+ public String toString() {
+ return "TopicStorePrefetch" + System.identityHashCode(this) + "("+clientId+","+subscriberName+")";
+ }
+
+ synchronized void nextToDispatch(MessageId id) throws Exception {
+ lastMessageId = store.getPreviousMessageIdToDeliver(clientId,clientId,id);
+ store.resetBatching(clientId,clientId,id);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java Mon Oct 9 06:05:20 2006
@@ -1,58 +1,55 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
+
/**
- *Represents a container of persistent objects in the store
- *Acts as a map, but values can be retrieved in insertion order
+ * Represents a container of persistent objects in the store Acts as a map, but values can be retrieved in insertion
+ * order
*
* @version $Revision: 1.2 $
*/
public interface ListContainer extends List{
+
/**
- * The container is created or retrieved in
- * an unloaded state.
- * load populates the container will all the indexes used etc
- * and should be called before any operations on the container
+ * The container is created or retrieved in an unloaded state. load populates the container will all the indexes
+ * used etc and should be called before any operations on the container
*/
public void load();
-
+
/**
* unload indexes from the container
- *
+ *
*/
public void unload();
-
+
/**
* @return true if the indexes are loaded
*/
public boolean isLoaded();
-
-
+
/**
- * For homogenous containers can set a custom marshaller for loading values
- * The default uses Object serialization
- * @param marshaller
+ * For homogenous containers can set a custom marshaller for loading values The default uses Object serialization
+ *
+ * @param marshaller
*/
public void setMarshaller(Marshaller marshaller);
+
/**
* @return the id the MapContainer was create with
*/
@@ -62,46 +59,46 @@
* @return the number of values in the container
*/
public int size();
-
+
/**
* Inserts the given element at the beginning of this list.
- *
+ *
* @param o the element to be inserted at the beginning of this list.
*/
public void addFirst(Object o);
/**
- * Appends the given element to the end of this list. (Identical in
- * function to the <tt>add</tt> method; included only for consistency.)
- *
+ * Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
+ * only for consistency.)
+ *
* @param o the element to be inserted at the end of this list.
*/
public void addLast(Object o);
-
+
/**
* Removes and returns the first element from this list.
- *
+ *
* @return the first element from this list.
- * @throws NoSuchElementException if this list is empty.
+ * @throws NoSuchElementException if this list is empty.
*/
public Object removeFirst();
/**
* Removes and returns the last element from this list.
- *
+ *
* @return the last element from this list.
- * @throws NoSuchElementException if this list is empty.
+ * @throws NoSuchElementException if this list is empty.
*/
public Object removeLast();
-
-
+
/**
* remove an objecr from the list without retrieving the old value from the store
+ *
* @param position
* @return true if successful
*/
public boolean doRemove(int position);
-
+
/**
* @return the maximumCacheSize
*/
@@ -111,46 +108,87 @@
* @param maximumCacheSize the maximumCacheSize to set
*/
public void setMaximumCacheSize(int maximumCacheSize);
-
+
/**
* clear any cached values
*/
public void clearCache();
-
+
/**
* add an Object to the list but get a StoreEntry of its position
+ *
* @param object
* @return the entry in the Store
*/
public StoreEntry placeLast(Object object);
-
+
/**
* insert an Object in first position int the list but get a StoreEntry of its position
+ *
* @param object
* @return the location in the Store
*/
public StoreEntry placeFirst(Object object);
-
+
/**
* Advanced feature = must ensure the object written doesn't overwrite other objects in the container
- * @param entry
- * @param object
+ *
+ * @param entry
+ * @param object
*/
- public void update(StoreEntry entry, Object object);
-
+ public void update(StoreEntry entry,Object object);
+
/**
* Retrieve an Object from the Store by its location
+ *
* @param entry
* @return the Object at that entry
*/
public Object get(StoreEntry entry);
-
+
+ /**
+ * Get the StoreEntry for the first item of the list
+ *
+ * @return the first StoreEntry or null if the list is empty
+ */
+ public StoreEntry getFirst();
+
+ /**
+ * Get yjr StoreEntry for the last item of the list
+ *
+ * @return the last StoreEntry or null if the list is empty
+ */
+ public StoreEntry getLast();
+
+ /**
+ * Get the next StoreEntry from the list
+ *
+ * @param entry
+ * @return the next StoreEntry or null
+ */
+ public StoreEntry getNext(StoreEntry entry);
+
+ /**
+ * Get the previous StoreEntry from the list
+ *
+ * @param entry
+ * @return the previous store entry or null
+ */
+ public StoreEntry getPrevious(StoreEntry entry);
+
/**
* remove the Object at the StoreEntry
+ *
* @param entry
* @return true if successful
*/
public boolean remove(StoreEntry entry);
-
+ /**
+ * It's possible that a StoreEntry could be come stale
+ * this will return an upto date entry for the StoreEntry position
+ * @param entry old entry
+ * @return a refreshed StoreEntry
+ */
+ public StoreEntry refresh(StoreEntry entry);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Mon Oct 9 06:05:20 2006
@@ -381,7 +381,7 @@
if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){
throw new RuntimeException("Unknown IndexType: "+type);
}
- this.indexType=indexType;
+ this.indexType=type;
}
public synchronized void initialize() throws IOException{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/CachedContainerListIterator.java Mon Oct 9 06:05:20 2006
@@ -18,9 +18,7 @@
package org.apache.activemq.kaha.impl.container;
import java.util.ListIterator;
-
import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.kaha.impl.index.IndexLinkedList;
/**
* @version $Revision$
@@ -28,14 +26,12 @@
public class CachedContainerListIterator implements ListIterator{
protected ListContainerImpl container;
- protected IndexLinkedList list;
protected int pos = 0;
protected int nextPos =0;
protected StoreEntry currentItem;
protected CachedContainerListIterator(ListContainerImpl container,int start){
this.container=container;
- this.list=list;
this.pos=start;
this.nextPos = this.pos;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Mon Oct 9 06:05:20 2006
@@ -1,19 +1,15 @@
/**
*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl.container;
@@ -52,8 +48,8 @@
protected int maximumCacheSize=100;
protected IndexItem lastCached;
- public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType)
- throws IOException{
+ public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
+ String indexType) throws IOException{
super(id,root,indexManager,dataManager,indexType);
}
@@ -462,15 +458,15 @@
indexList.add(index,item);
itemAdded(item,index,element);
}
-
- protected StoreEntry internalAddLast(Object o) {
+
+ protected StoreEntry internalAddLast(Object o){
load();
IndexItem item=writeLast(o);
indexList.addLast(item);
itemAdded(item,indexList.size()-1,o);
return item;
}
-
+
protected StoreEntry internalAddFirst(Object o){
load();
IndexItem item=writeFirst(o);
@@ -486,8 +482,6 @@
itemAdded(item,index,element);
return item;
}
-
-
protected StoreEntry internalGet(int index){
load();
@@ -623,27 +617,29 @@
}
return result;
}
-
+
/**
* add an Object to the list but get a StoreEntry of its position
+ *
* @param object
* @return the entry in the Store
*/
- public synchronized StoreEntry placeLast(Object object) {
- StoreEntry item = internalAddLast(object);
+ public synchronized StoreEntry placeLast(Object object){
+ StoreEntry item=internalAddLast(object);
return item;
}
-
+
/**
* insert an Object in first position int the list but get a StoreEntry of its position
+ *
* @param object
* @return the location in the Store
*/
- public synchronized StoreEntry placeFirst(Object object) {
- StoreEntry item = internalAddFirst(object);
+ public synchronized StoreEntry placeFirst(Object object){
+ StoreEntry item=internalAddFirst(object);
return item;
}
-
+
/**
* @param entry
* @param object
@@ -651,41 +647,90 @@
*/
public void update(StoreEntry entry,Object object){
try{
- dataManager.updateItem(entry.getValueDataItem(),marshaller, object);
+ dataManager.updateItem(entry.getValueDataItem(),marshaller,object);
}catch(IOException e){
throw new RuntimeException(e);
}
-
+ }
+
+ /**
+ * Retrieve an Object from the Store by its location
+ *
+ * @param entry
+ * @return the Object at that entry
+ */
+ public synchronized Object get(StoreEntry entry){
+ load();
+ return getValue(entry);
+ }
+
+ /**
+ * remove the Object at the StoreEntry
+ *
+ * @param entry
+ * @return true if successful
+ */
+ public synchronized boolean remove(StoreEntry entry){
+ IndexItem item=(IndexItem)entry;
+ load();
+ boolean result=false;
+ if(item!=null){
+ clearCache();
+ remove(item);
+ result = true;
+ }
+ return result;
+ }
+
+ /**
+ * Get the StoreEntry for the first item of the list
+ *
+ * @return the first StoreEntry or null if the list is empty
+ */
+ public synchronized StoreEntry getFirst(){
+ return indexList.getFirst();
+ }
+
+ /**
+ * Get yjr StoreEntry for the last item of the list
+ *
+ * @return the last StoreEntry or null if the list is empty
+ */
+ public synchronized StoreEntry getLast(){
+ return indexList.getLast();
+ }
+
+ /**
+ * Get the next StoreEntry from the list
+ *
+ * @param entry
+ * @return the next StoreEntry or null
+ */
+ public synchronized StoreEntry getNext(StoreEntry entry){
+ IndexItem item=(IndexItem)entry;
+ return indexList.getNextEntry(item);
+ }
+
+ /**
+ * Get the previous StoreEntry from the list
+ *
+ * @param entry
+ * @return the previous store entry or null
+ */
+ public synchronized StoreEntry getPrevious(StoreEntry entry){
+ IndexItem item=(IndexItem)entry;
+ return indexList.getPrevEntry(item);
}
/**
- * Retrieve an Object from the Store by its location
- * @param entry
- * @return the Object at that entry
- */
- public synchronized Object get(StoreEntry entry) {
- load();
- return getValue(entry);
- }
-
- /**
- * remove the Object at the StoreEntry
- * @param entry
- * @return true if successful
- */
- public synchronized boolean remove(StoreEntry entry) {
- IndexItem item = (IndexItem)entry;
- load();
- boolean result = false;
- if(item!=null){
- clearCache();
- IndexItem prev=indexList.getPrevEntry(item);
- prev=prev!=null?prev:root;
- IndexItem next=indexList.getNextEntry(item);
- delete(item,prev,next);
- }
- return result;
- }
+ * It's possible that a StoreEntry could be come stale
+ * this will return an upto date entry for the StoreEntry position
+ * @param entry old entry
+ * @return a refreshed StoreEntry
+ */
+ public synchronized StoreEntry refresh(StoreEntry entry) {
+ return indexList.getEntry(entry);
+ }
protected IndexItem writeLast(Object value){
IndexItem index=null;
@@ -782,7 +827,7 @@
if(item!=null){
try{
// ensure it's up to date
- //item=indexList.getEntry(item);
+ // item=indexList.getEntry(item);
StoreLocation data=item.getValueDataItem();
result=dataManager.readItem(marshaller,data);
}catch(IOException e){
@@ -903,8 +948,7 @@
}
/**
- * @param cacheList
- * the cacheList to set
+ * @param cacheList the cacheList to set
*/
public synchronized void setCacheList(LinkedList cacheList){
this.cacheList=cacheList;
@@ -918,8 +962,7 @@
}
/**
- * @param lastCached
- * the lastCached to set
+ * @param lastCached the lastCached to set
*/
public synchronized void setLastCached(IndexItem lastCached){
this.lastCached=lastCached;
@@ -933,8 +976,7 @@
}
/**
- * @param maximumCacheSize
- * the maximumCacheSize to set
+ * @param maximumCacheSize the maximumCacheSize to set
*/
public synchronized void setMaximumCacheSize(int maximumCacheSize){
this.maximumCacheSize=maximumCacheSize;
@@ -948,12 +990,9 @@
}
/**
- * @param offset
- * the offset to set
+ * @param offset the offset to set
*/
public synchronized void setOffset(int offset){
this.offset=offset;
}
-
-
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Oct 9 06:05:20 2006
@@ -86,8 +86,16 @@
delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
}
- public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
- return delegate.getNextMessageToDeliver(clientId,subscriptionName);
+ public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+ delegate.resetBatching(clientId,subscriptionName,id);
+ }
+
+ public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+ return delegate.getNextMessageIdToDeliver(clientId,subscriptionName,id);
+ }
+
+ public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+ return delegate.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
}
public ActiveMQDestination getDestination() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon Oct 9 06:05:20 2006
@@ -78,15 +78,37 @@
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
MessageRecoveryListener listener) throws Exception;
+ /**
+ * A hint to the Store to reset any batching state for a durable subsriber
+ * @param clientId
+ * @param subscriptionName
+ * @param nextToDispatch
+ *
+ */
+ public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch);
+
+ /**
+ * Get the next messageId to deliver to a subscriber after the MessageId provided
+ * @param clientId
+ * @param subscriptionName
+ * @param id
+ * @return the next messageId or null
+ * @throws IOException
+ * @throws Exception
+ */
+ public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
+
/**
- * Get the next un-acknowledged message to deliver to a subscriber
+ * Get the previous messageId to deliver to a subscriber before the MessageId provided
* @param clientId
* @param subscriptionName
- * @return the next message or null
+ * @param id
+ * @return the next messageId or null
* @throws IOException
+ * @throws Exception
*/
- public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException;
+ public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception;
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Oct 9 06:05:20 2006
@@ -1,26 +1,22 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Set;
-
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
@@ -28,62 +24,69 @@
/**
* @version $Revision: 1.5 $
*/
-public interface JDBCAdapter {
-
+public interface JDBCAdapter{
+
public void setStatements(Statements statementProvider);
-
- public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException;
- public abstract void doDropTables(TransactionContext c) throws SQLException, IOException;
+ public abstract void doCreateTables(TransactionContext c) throws SQLException,IOException;
+
+ public abstract void doDropTables(TransactionContext c) throws SQLException,IOException;
- public abstract void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data,
- long expiration) throws SQLException, IOException;
- public abstract void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
+ public abstract void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
+ byte[] data,long expiration) throws SQLException,IOException;
- public abstract byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
- public abstract String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
+ public abstract void doAddMessageReference(TransactionContext c,MessageId messageId,
+ ActiveMQDestination destination,long expirationTime,String messageRef) throws SQLException,IOException;
- public abstract void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
+ public abstract byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException;
- public abstract void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
- throws Exception;
+ public abstract String doGetMessageReference(TransactionContext c,long id) throws SQLException,IOException;
- public abstract void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException,
- IOException;
+ public abstract void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException;
- public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception;
-
- public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName, long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
+ public abstract void doRecover(TransactionContext c,ActiveMQDestination destination,
+ JDBCMessageRecoveryListener listener) throws Exception;
- public abstract void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException;
+ public abstract void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId,
+ String subscriptionName,long seq) throws SQLException,IOException;
- public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
- String clientId, String subscriptionName)
- throws SQLException, IOException;
+ public abstract void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
+ String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception;
- public abstract long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
+ public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
+ String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
- public abstract void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
+ public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
+ String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException;
- public abstract void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName)
- throws SQLException, IOException;
+ public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
+ String clientId,String subscriptionName) throws SQLException,IOException;
- public abstract void doDeleteOldMessages(TransactionContext c)
- throws SQLException, IOException;
+ public abstract long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException;
- public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
+ public abstract void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName)
+ throws SQLException,IOException;
- public abstract Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
+ public abstract void doDeleteSubscription(TransactionContext c,ActiveMQDestination destinationName,String clientId,
+ String subscriptionName) throws SQLException,IOException;
+
+ public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException;
+
+ public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException;
+
+ public abstract Set doGetDestinations(TransactionContext c) throws SQLException,IOException;
public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
- public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
+ public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
+ throws SQLException,IOException;
+
+ public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
+ String subscriptionName) throws SQLException,IOException;
- public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException, IOException;
-
- public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriptionName) throws SQLException, IOException;
+ public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
+ String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
+ public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
+ String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Mon Oct 9 06:05:20 2006
@@ -30,6 +30,7 @@
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* @version $Revision: 1.6 $
@@ -110,12 +111,16 @@
});
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+
} finally {
c.close();
}
}
+
+ public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+ }
+
/**
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
@@ -175,21 +180,75 @@
}
}
- public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
- Message result = null;
-
+ public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+
+ final MessageId result = new MessageId();
+ final AtomicBoolean initalized = new AtomicBoolean();
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
- byte[] data = adapter.doGetNextDurableSubscriberMessageStatement(c, destination, clientId, subscriptionName);
- result = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ long sequence = id != null ? id.getBrokerSequenceId() : -1;
+ adapter.doGetNextDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
+ public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
+ initalized.set(true);
+
+ }
+ public void recoverMessageReference(String reference) throws Exception {
+ result.setValue(reference);
+ initalized.set(true);
+
+ }
+
+ public void finished(){
+ }
+ });
+
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+ throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
- return result;
+ return initalized.get () ? result : null;
+ }
+
+ public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+ final MessageId result = new MessageId();
+ final AtomicBoolean initalized = new AtomicBoolean();
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+ try {
+ long sequence = id != null ? id.getBrokerSequenceId() : -1;
+ adapter.doGetPrevDurableSubscriberMessageIdStatement(c, destination, clientId, subscriptionName,sequence,new JDBCMessageRecoveryListener() {
+ public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ result.setProducerId(msg.getMessageId().getProducerId());
+ result.setProducerSequenceId(msg.getMessageId().getProducerSequenceId());
+ result.setBrokerSequenceId(msg.getMessageId().getBrokerSequenceId());
+ initalized.set(true);
+
+ }
+ public void recoverMessageReference(String reference) throws Exception {
+ result.setValue(reference);
+ initalized.set(true);
+
+ }
+
+ public void finished(){
+ }
+ });
+
+
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ throw IOExceptionSupport.create("Failed to get next MessageId to deliver: " + clientId + ". Reason: " + e, e);
+ } finally {
+ c.close();
+ }
+ return initalized.get () ? result : null;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
@@ -200,7 +259,7 @@
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
- throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+ throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
} finally {
c.close();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Oct 9 06:05:20 2006
@@ -64,6 +64,8 @@
private String lockUpdateStatement;
private String nextDurableSubscriberMessageStatement;
private String durableSubscriberMessageCountStatement;
+ private String nextDurableSubscriberMessageIdStatement;
+ private String prevDurableSubscriberMessageIdStatement;
private boolean useLockCreateWhereClause;
public String[] getCreateSchemaStatements() {
@@ -210,10 +212,9 @@
public String getFindDurableSubMessagesStatement(){
if(findDurableSubMessagesStatement==null){
- findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
- +getFullAckTableName()+" D "+" WHERE ?>= ( SELECT COUNT(*) FROM "
- +getFullMessageTableName()+" M, " + getFullAckTableName() + " D WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- +" AND M.CONTAINER=D.CONTAINER AND M.ID > ?)"+" ORDER BY M.ID)";
+ findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID";
}
return findDurableSubMessagesStatement;
}
@@ -229,10 +230,9 @@
public String getNextDurableSubscriberMessageStatement(){
if (nextDurableSubscriberMessageStatement == null){
- nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
- +getFullAckTableName()+" D "+" WHERE 1 >= ( SELECT COUNT(*) FROM "
- +getFullMessageTableName()+" M, WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+") ORDER BY M.ID)";
+ nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
}
return nextDurableSubscriberMessageStatement;
}
@@ -240,14 +240,55 @@
/**
* @return the durableSubscriberMessageCountStatement
*/
+
+
public String getDurableSubscriberMessageCountStatement(){
if (durableSubscriberMessageCountStatement==null){
- durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM "
- +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+ durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, "
+ + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
}
return durableSubscriberMessageCountStatement;
}
+
+ /**
+ * @return the nextDurableSubscriberMessageIdStatement
+ */
+ public String getNextDurableSubscriberMessageIdStatement(){
+ if (nextDurableSubscriberMessageIdStatement==null) {
+ nextDurableSubscriberMessageIdStatement =
+ "SELECT M.ID FROM " + getFullMessageTableName() + " M, "
+ + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
+ }
+ return nextDurableSubscriberMessageIdStatement;
+ }
+
+ /**
+ * @return the prevDurableSubscriberMessageIdStatement
+ */
+ /*
+ public String getPrevDurableSubscriberMessageIdStatement(){
+ if(prevDurableSubscriberMessageIdStatement==null) {
+ prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID < ?" + " ORDER BY M.ID ";
+ }
+ return prevDurableSubscriberMessageIdStatement;
+ }
+ */
+
+
+ public String getPrevDurableSubscriberMessageIdStatement(){
+ if(prevDurableSubscriberMessageIdStatement==null) {
+ prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M "
+ + " WHERE M.CONTAINER=? "
+ + " AND M.ID <?" + " ORDER BY M.ID DESC ";
+ }
+ return prevDurableSubscriberMessageIdStatement;
+ }
+
+
public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
@@ -564,5 +605,26 @@
*/
public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
+ }
+
+
+
+
+
+ /**
+ * @param nextDurableSubscriberMessageIdStatement the nextDurableSubscriberMessageIdStatement to set
+ */
+ public void setNextDurableSubscriberMessageIdStatement(String nextDurableSubscriberMessageIdStatement){
+ this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement;
+ }
+
+
+
+
+ /**
+ * @param prevDurableSubscriberMessageIdStatement the prevDurableSubscriberMessageIdStatement to set
+ */
+ public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
+ this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
}
}