You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/09/12 12:07:35 UTC
svn commit: r442550 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/
main/java/org/apache/activemq/store/jdbc/ main/...
Author: rajdavies
Date: Tue Sep 12 03:07:34 2006
New Revision: 442550
URL: http://svn.apache.org/viewvc?view=rev&rev=442550
Log:
More foundation work to resolve: http://issues.apache.org/activemq/browse/AMQ-845
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Tue Sep 12 03:07:34 2006
@@ -155,4 +155,8 @@
public int getPrefetchSize() {
return info.getPrefetchSize();
}
+
+ public boolean isRecoveryRequired(){
+ return true;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Sep 12 03:07:34 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -40,7 +41,8 @@
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
- //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
+ //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName()));
+ // super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
super(broker,context,info);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
@@ -78,12 +80,14 @@
topic.activate(context, this);
}
}
+ pending.start();
dispatchMatched();
}
}
synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
active=false;
+ pending.stop();
if( !keepDurableSubsActive ) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Sep 12 03:07:34 2006
@@ -188,7 +188,7 @@
if(context.isInTransaction()) {
// extend prefetch window only if not a pulling consumer
if (getPrefetchSize() != 0) {
- prefetchExtension=Math.max(prefetchExtension,index+1);
+ prefetchExtension=Math.max(prefetchExtension,index+1);
}
}
else {
@@ -316,6 +316,10 @@
return enqueueCounter;
}
+ public boolean isRecoveryRequired(){
+ return pending.isRecoveryRequired();
+ }
+
/**
* optimize message consumer prefetch if the consumer supports it
*
@@ -336,7 +340,16 @@
*/
}
-
+ public void add(ConnectionContext context, Destination destination) throws Exception {
+ super.add(context,destination);
+ pending.add(context,destination);
+ }
+
+ public void remove(ConnectionContext context, Destination destination) throws Exception {
+ super.remove(context,destination);
+ pending.remove(context,destination);
+
+ }
protected void dispatchMatched() throws IOException{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Tue Sep 12 03:07:34 2006
@@ -190,5 +190,13 @@
* @return the prefetch size that is configured for the subscription
*/
int getPrefetchSize();
+
+ /**
+ * Informs the Broker if the subscription needs to intervention to recover it's state
+ * e.g. DurableTopicSubscriber may do
+ * @see org.apache.activemq.region.cursors.PendingMessageCursor
+ * @return true if recovery required
+ */
+ public boolean isRecoveryRequired();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Sep 12 03:07:34 2006
@@ -180,31 +180,30 @@
final MessageEvaluationContext msgContext = new MessageEvaluationContext();
msgContext.setDestination(destination);
- store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
- public void recoverMessage(Message message) throws Exception {
- message.setRegionDestination(Topic.this);
- try {
- msgContext.setMessageReference(message);
- if (subscription.matches(message, msgContext)) {
- subscription.add(message);
+ if(subscription.isRecoveryRequired()){
+ store.recoverSubscription(clientId,subscriptionName,new MessageRecoveryListener(){
+ public void recoverMessage(Message message) throws Exception{
+ message.setRegionDestination(Topic.this);
+ try{
+ msgContext.setMessageReference(message);
+ if(subscription.matches(message,msgContext)){
+ subscription.add(message);
+ }
+ }catch(InterruptedException e){
+ Thread.currentThread().interrupt();
+ }catch(IOException e){
+ // TODO: Need to handle this better.
+ e.printStackTrace();
}
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+
+ public void recoverMessageReference(String messageReference) throws Exception{
+ throw new RuntimeException("Should not be called.");
}
- catch (IOException e) {
- // TODO: Need to handle this better.
- e.printStackTrace();
- }
- }
-
- public void recoverMessageReference(String messageReference) throws Exception {
- throw new RuntimeException("Should not be called.");
- }
-
- public void finished(){
- }
- });
+
+ public void finished(){}
+ });
+ }
if( true && subscription.getConsumerInfo().isRetroactive() ) {
// If nothing was in the persistent store, then try to use the recovery policy.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Tue Sep 12 03:07:34 2006
@@ -24,6 +24,7 @@
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
@@ -216,6 +217,9 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
if (info.isDurable()) {
+ if (AdvisorySupport.isAdvisoryTopic(info.getDestination())){
+ throw new JMSException("Cannot create a durable subscription for an advisory Topic");
+ }
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub == null) {
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=auto&rev=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+
+
+/**
+ * Default method holder for pending message (messages awaiting disptach to a consumer) cursor
+ *
+ * @version $Revision$
+ */
+public abstract class AbstractPendingMessageCursor implements PendingMessageCursor{
+
+ public void start() throws Exception{
+ }
+
+ public void stop() throws Exception{
+ }
+
+ public void add(ConnectionContext context, Destination destination) throws Exception{
+ }
+
+ public void remove(ConnectionContext context, Destination destination) throws Exception{
+ }
+
+
+ public boolean isRecoveryRequired(){
+ return true;
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -12,12 +12,14 @@
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
+
import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.*;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.Store;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
/**
@@ -25,25 +27,26 @@
*
* @version $Revision$
*/
-public class FilePendingMessageCursor implements PendingMessageCursor{
+public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
private ListContainer list;
- private Iterator iter = null;
+ private Iterator iter=null;
private Destination regionDestination;
-
+
/**
* @param name
* @param store
* @throws IOException
*/
- public FilePendingMessageCursor(String name, Store store) {
+ public FilePendingMessageCursor(String name,Store store){
try{
- list = store.getListContainer(name);
+ list=store.getListContainer(name);
list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
list.setMaximumCacheSize(0);
}catch(IOException e){
throw new RuntimeException(e);
}
}
+
/**
* @return true if there are no pending messages
*/
@@ -53,12 +56,12 @@
/**
* reset the cursor
- *
+ *
*/
public void reset(){
- iter = list.listIterator();
+ iter=list.listIterator();
}
-
+
/**
* add message to await dispatch
*
@@ -66,42 +69,42 @@
*/
public void addMessageLast(MessageReference node){
try{
- regionDestination = node.getMessage().getRegionDestination();
+ regionDestination=node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
list.addLast(node);
}
-
+
/**
* add message to await dispatch
- * @param position
+ *
+ * @param position
* @param node
*/
public void addMessageFirst(MessageReference node){
try{
- regionDestination = node.getMessage().getRegionDestination();
+ regionDestination=node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
list.addFirst(node);
}
-
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext(){
- return iter.hasNext();
+ return iter.hasNext();
}
/**
* @return the next pending message
*/
public MessageReference next(){
- Message message = (Message) iter.next();
+ Message message=(Message) iter.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
return message;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -13,6 +13,9 @@
*/
package org.apache.activemq.broker.region.cursors;
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
/**
@@ -20,7 +23,24 @@
*
* @version $Revision$
*/
-public interface PendingMessageCursor{
+public interface PendingMessageCursor extends Service{
+
+
+ /**
+ * Add a destination
+ * @param context
+ * @param destination
+ * @throws Exception
+ */
+ public void add(ConnectionContext context, Destination destination) throws Exception;
+
+ /**
+ * remove a destination
+ * @param context
+ * @param destination
+ * @throws Exception
+ */
+ public void remove(ConnectionContext context, Destination destination) throws Exception;
/**
* @return true if there are no pending messages
*/
@@ -70,4 +90,12 @@
*
*/
public void clear();
+
+ /**
+ * Informs the Broker if the subscription needs to intervention to recover it's state
+ * e.g. DurableTopicSubscriber may do
+ * @see org.apache.activemq.region.cursors.PendingMessageCursor
+ * @return true if recovery required
+ */
+ public boolean isRecoveryRequired();
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=auto&rev=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Sep 12 03:07:34 2006
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+/**
+ * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ *
+ * @version $Revision$
+ */
+public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
+ static private final Log log=LogFactory.getLog(StoreDurableSubscriberCursor.class);
+ private int pendingCount=0;
+ private String clientId;
+ private String subscriberName;
+ private int maxBatchSize=10;
+ private LinkedList batchList=new LinkedList();
+ private Map topics=new HashMap();
+ private LinkedList storePrefetches=new LinkedList();
+ private AtomicBoolean started=new AtomicBoolean();
+
+ /**
+ * @param topic
+ * @param clientId
+ * @param subscriberName
+ * @throws IOException
+ */
+ public StoreDurableSubscriberCursor(String clientId,String subscriberName){
+ this.clientId=clientId;
+ this.subscriberName=subscriberName;
+ }
+
+ public synchronized void start() throws Exception{
+ started.set(true);
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+ tsp.start();
+ pendingCount+=tsp.size();
+ }
+ }
+
+ public synchronized void stop() throws Exception{
+ started.set(false);
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+ tsp.stop();
+ }
+ pendingCount=0;
+ }
+
+ /**
+ * Add a destination
+ *
+ * @param context
+ * @param destination
+ * @throws Exception
+ */
+ public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
+ TopicStorePrefetch tsp=new TopicStorePrefetch((Topic) destination,batchList,clientId,subscriberName);
+ topics.put(destination,tsp);
+ storePrefetches.add(tsp);
+ if(started.get()){
+ tsp.start();
+ pendingCount+=tsp.size();
+ }
+ }
+
+ /**
+ * remove a destination
+ *
+ * @param context
+ * @param destination
+ * @throws Exception
+ */
+ public synchronized void remove(ConnectionContext context,Destination destination) throws Exception{
+ TopicStorePrefetch tsp=(TopicStorePrefetch) topics.remove(destination);
+ if(tsp!=null){
+ storePrefetches.remove(tsp);
+ }
+ }
+
+ /**
+ * @return true if there are no pending messages
+ */
+ public synchronized boolean isEmpty(){
+ return pendingCount<=0;
+ }
+
+ /**
+ * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
+ * may do
+ *
+ * @see org.apache.activemq.region.cursors.PendingMessageCursor
+ * @return true if recovery required
+ */
+ public boolean isRecoveryRequired(){
+ return false;
+ }
+
+ public synchronized void addMessageFirst(MessageReference node){
+ pendingCount++;
+ }
+
+ public synchronized void addMessageLast(MessageReference node){
+ pendingCount++;
+ }
+
+ public void clear(){
+ pendingCount=0;
+ }
+
+ public synchronized boolean hasNext(){
+ return !isEmpty();
+ }
+
+ public synchronized MessageReference next(){
+ MessageReference result=null;
+ if(!isEmpty()){
+ if(batchList.isEmpty()){
+ try{
+ fillBatch();
+ }catch(Exception e){
+ log.error("Couldn't fill batch from store ",e);
+ throw new RuntimeException(e);
+ }
+ }
+ if(!batchList.isEmpty()){
+ result=(MessageReference) batchList.removeFirst();
+ }
+ }
+ return result;
+ }
+
+ public synchronized void remove(){
+ pendingCount--;
+ }
+
+ public void reset(){
+ batchList.clear();
+ }
+
+ public int size(){
+ return pendingCount;
+ }
+
+ private synchronized void fillBatch() throws Exception{
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ TopicStorePrefetch tsp=(TopicStorePrefetch) i.next();
+ tsp.fillBatch();
+ if(batchList.size()>=maxBatchSize){
+ break;
+ }
+ }
+ // round-robin
+ Object obj=storePrefetches.removeFirst();
+ storePrefetches.addLast(obj);
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=auto&rev=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Sep 12 03:07:34 2006
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ *
+ * @version $Revision$
+ */
+class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener{
+ static private final Log log=LogFactory.getLog(TopicStorePrefetch.class);
+ private Topic topic;
+ private TopicMessageStore store;
+ private LinkedList batchList;
+ private String clientId;
+ private String subscriberName;
+ private int pendingCount=0;
+ private MessageId lastMessageId;
+ private int maxBatchSize=10;
+
+ /**
+ * @param topic
+ * @param batchList
+ * @param clientId
+ * @param subscriberName
+ * @throws IOException
+ */
+ public TopicStorePrefetch(Topic topic,LinkedList batchList,String clientId,String subscriberName){
+ this.topic=topic;
+ this.store=(TopicMessageStore) topic.getMessageStore();
+ this.batchList=batchList;
+ this.clientId=clientId;
+ this.subscriberName=subscriberName;
+ }
+
+ public void start() throws Exception{
+ pendingCount=store.getMessageCount(clientId,subscriberName);
+ System.err.println("Pending count = "+pendingCount);
+ }
+
+ public void stop() throws Exception{
+ pendingCount=0;
+ lastMessageId=null;
+ }
+
+ /**
+ * @return true if there are no pending messages
+ */
+ public boolean isEmpty(){
+ return pendingCount==0;
+ }
+
+ /**
+ * Informs the Broker if the subscription needs to intervention to recover it's state e.g. DurableTopicSubscriber
+ * may do
+ *
+ * @see org.apache.activemq.region.cursors.PendingMessageCursor
+ * @return true if recovery required
+ */
+ public boolean isRecoveryRequired(){
+ return false;
+ }
+
+ public synchronized void addMessageFirst(MessageReference node){
+ pendingCount++;
+ }
+
+ public synchronized void addMessageLast(MessageReference node){
+ pendingCount++;
+ }
+
+ public void clear(){
+ pendingCount=0;
+ lastMessageId=null;
+ }
+
+ public synchronized boolean hasNext(){
+ return !isEmpty();
+ }
+
+ public synchronized MessageReference next(){
+ MessageReference result=null;
+ if(!isEmpty()){
+ if(batchList.isEmpty()){
+ try{
+ fillBatch();
+ }catch(Exception e){
+ log.error(topic.getDestination()+" Couldn't fill batch from store ",e);
+ throw new RuntimeException(e);
+ }
+ }
+ result=(MessageReference) batchList.removeFirst();
+ }
+ return result;
+ }
+
+ public synchronized void remove(){
+ pendingCount--;
+ }
+
+ public void reset(){
+ batchList.clear();
+ }
+
+ public int size(){
+ return pendingCount;
+ }
+
+ // MessageRecoveryListener implementation
+ public void finished(){}
+
+ public void recoverMessage(Message message) throws Exception{
+ batchList.addLast(message);
+ }
+
+ public void recoverMessageReference(String messageReference) throws Exception{
+ // shouldn't get called
+ throw new RuntimeException("Not supported");
+ }
+
+ // implementation
+ protected void fillBatch() throws Exception{
+ if(pendingCount<=0){
+ pendingCount=store.getMessageCount(clientId,subscriberName);
+ }
+ if(pendingCount>0){
+ store.recoverNextMessages(clientId,subscriberName,lastMessageId,maxBatchSize,this);
+ // this will add more messages to the batch list
+ if(!batchList.isEmpty()){
+ Message message=(Message) batchList.getLast();
+ lastMessageId=message.getMessageId();
+ }
+ }
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Tue Sep 12 03:07:34 2006
@@ -20,7 +20,7 @@
*
* @version $Revision$
*/
-public class VMPendingMessageCursor implements PendingMessageCursor{
+public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
private LinkedList list = new LinkedList();
private Iterator iter = null;
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -82,6 +82,14 @@
delegate.recoverSubscription(clientId, subscriptionName, listener);
}
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+ delegate.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+ }
+
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+ return delegate.getNextMessageToDeliver(clientId,subscriptionName);
+ }
+
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}
@@ -100,4 +108,8 @@
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
}
+
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ return delegate.getMessageCount(clientId,subscriberName);
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -1,95 +1,134 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store;
import java.io.IOException;
-
import javax.jms.JMSException;
-
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
-
/**
* A MessageStore for durable topic subscriptions
- *
+ *
* @version $Revision: 1.4 $
*/
-public interface TopicMessageStore extends MessageStore {
-
+public interface TopicMessageStore extends MessageStore{
/**
- * Stores the last acknowledged messgeID for the given subscription
- * so that we can recover and commence dispatching messages from the last
- * checkpoint
- * @param context TODO
+ * Stores the last acknowledged messgeID for the given subscription so that we can recover and commence dispatching
+ * messages from the last checkpoint
+ *
+ * @param context
+ * @param clientId
+ * @param subscriptionName
* @param messageId
* @param subscriptionPersistentId
+ * @throws IOException
*/
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
+ public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
+ throws IOException;
/**
+ * @param clientId
+ * @param subscriptionName
* @param sub
- * @throws JMSException
+ * @throws IOException
+ * @throws JMSException
*/
- public void deleteSubscription(String clientId, String subscriptionName) throws IOException;
-
+ public void deleteSubscription(String clientId,String subscriptionName) throws IOException;
+
/**
- * For the new subscription find the last acknowledged message ID
- * and then find any new messages since then and dispatch them
- * to the subscription.
- * <p/>
- * e.g. if we dispatched some messages to a new durable topic subscriber, then went down before
- * acknowledging any messages, we need to know the correct point from which to recover from.
+ * For the new subscription find the last acknowledged message ID and then find any new messages since then and
+ * dispatch them to the subscription. <p/> e.g. if we dispatched some messages to a new durable topic subscriber,
+ * then went down before acknowledging any messages, we need to know the correct point from which to recover from.
+ *
+ * @param clientId
+ * @param subscriptionName
+ * @param listener
* @param subscription
- *
- * @throws Exception
+ *
+ * @throws Exception
*/
- public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
+ public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+ throws Exception;
/**
+ * For an active subscription - retrieve messages from the store for the subscriber after the lastMessageId
+ * messageId <p/>
+ *
+ * @param clientId
+ * @param subscriptionName
+ * @param lastMessageId
+ * @param maxReturned
+ * @param listener
+ *
+ * @throws Exception
+ */
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+ MessageRecoveryListener listener) throws Exception;
+
+
+ /**
+ * Get the next un-acknowledged message to deliver to a subscriber
+ * @param clientId
+ * @param subscriptionName
+ * @return the next message or null
+ * @throws IOException
+ */
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException;
+
+
+ /**
+ * Get the number of messages ready to deliver from the store to a durable subscriber
+ * @param clientId
+ * @param subscriberName
+ * @return the outstanding message count
+ * @throws IOException
+ */
+ public int getMessageCount(String clientId,String subscriberName) throws IOException;
+
+ /**
* Finds the subscriber entry for the given consumer info
*
- * @param clientId TODO
- * @param subscriptionName TODO
- * @return
+ * @param clientId
+ * @param subscriptionName
+ * @return the SubscriptionInfo
+ * @throws IOException
*/
- public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
+ public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
/**
* Lists all the durable subscirptions for a given destination.
*
- * @param clientId TODO
- * @param subscriptionName TODO
- * @return
+ * @return an array SubscriptionInfos
+ * @throws IOException
*/
public SubscriptionInfo[] getAllSubscriptions() throws IOException;
/**
- * Inserts the subscriber info due to a subscription change
- * <p/>
- * If this is a new subscription and the retroactive is false, then the last
- * message sent to the topic should be set as the last message acknowledged by they new
- * subscription. Otherwise, if retroactive is true, then create the subscription without
- * it having an acknowledged message so that on recovery, all message recorded for the
- * topic get replayed.
- * @param retroactive TODO
- *
+ * Inserts the subscriber info due to a subscription change <p/> If this is a new subscription and the retroactive
+ * is false, then the last message sent to the topic should be set as the last message acknowledged by they new
+ * subscription. Otherwise, if retroactive is true, then create the subscription without it having an acknowledged
+ * message so that on recovery, all message recorded for the topic get replayed.
+ *
+ * @param clientId
+ * @param subscriptionName
+ * @param selector
+ * @param retroactive
+ * @throws IOException
+ *
*/
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException;
-
+ public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ throws IOException;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Tue Sep 12 03:07:34 2006
@@ -53,6 +53,9 @@
public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception;
+
+ public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName, long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId,
String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException;
@@ -79,5 +82,8 @@
public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
+ public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException, IOException;
+
+ public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriptionName) throws SQLException, IOException;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -89,6 +89,33 @@
}
}
+ public void recoverNextMessages(final String clientId,final String subscriptionName, final MessageId lastMessageId,final int maxReturned,final MessageRecoveryListener listener) throws Exception{
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+ try {
+ long lastSequence = lastMessageId != null ? lastMessageId.getBrokerSequenceId() : -1;
+ adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,lastSequence,maxReturned,
+ new JDBCMessageRecoveryListener() {
+ public void recoverMessage(long sequenceId, byte[] data) throws Exception {
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ listener.recoverMessage(msg);
+ }
+ public void recoverMessageReference(String reference) throws Exception {
+ listener.recoverMessageReference(reference);
+ }
+
+ public void finished(){
+ listener.finished();
+ }
+ });
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+ } finally {
+ c.close();
+ }
+
+ }
/**
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
@@ -147,5 +174,41 @@
c.close();
}
}
+
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+ Message result = null;
+
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+ try {
+ byte[] data = adapter.doGetNextDurableSubscriberMessageStatement(c, destination, clientId, subscriptionName);
+ result = (Message) wireFormat.unmarshal(new ByteSequence(data));
+
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+ } finally {
+ c.close();
+ }
+ return result;
+ }
+
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ int result = 0;
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+ try {
+ result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
+
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+ throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
+ } finally {
+ c.close();
+ }
+ return result;
+ }
+
+
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Tue Sep 12 03:07:34 2006
@@ -53,6 +53,7 @@
private String updateLastAckOfDurableSubStatement;
private String deleteSubscriptionStatement;
private String findAllDurableSubMessagesStatement;
+ private String findDurableSubMessagesStatement;
private String findAllDestinationsStatement;
private String removeAllMessagesStatement;
private String removeAllSubscriptionsStatement;
@@ -61,6 +62,8 @@
private String[] dropSchemaStatements;
private String lockCreateStatement;
private String lockUpdateStatement;
+ private String nextDurableSubscriberMessageStatement;
+ private String durableSubscriberMessageCountStatement;
private boolean useLockCreateWhereClause;
public String[] getCreateSchemaStatements() {
@@ -204,6 +207,47 @@
}
return findAllDurableSubMessagesStatement;
}
+
+ public String getFindDurableSubMessagesStatement(){
+ if(findDurableSubMessagesStatement==null){
+ findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+ +getFullAckTableName()+" D "+" WHERE ? >= ( select count(*) from "
+ +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ +" AND M.CONTAINER=D.CONTAINER AND M.ID > ?"+" ORDER BY M.ID)";
+ }
+ return findDurableSubMessagesStatement;
+ }
+
+ public String findAllDurableSubMessagesStatement() {
+ if (findAllDurableSubMessagesStatement == null) {
+ findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID";
+ }
+ return findAllDurableSubMessagesStatement;
+ }
+
+ public String getNextDurableSubscriberMessageStatement(){
+ if (nextDurableSubscriberMessageStatement == null){
+ nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+ +getFullAckTableName()+" D "+" WHERE 1 >= ( select count(*) from "
+ +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+" ORDER BY M.ID)";
+ }
+ return nextDurableSubscriberMessageStatement;
+ }
+
+ /**
+ * @return the durableSubscriberMessageCountStatement
+ */
+ public String getDurableSubscriberMessageCountStatement(){
+ if (durableSubscriberMessageCountStatement==null){
+ durableSubscriberMessageCountStatement = "select count(*) from "
+ +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+ }
+ return durableSubscriberMessageCountStatement;
+ }
public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
@@ -498,5 +542,27 @@
public void setLockUpdateStatement(String lockUpdateStatement) {
this.lockUpdateStatement = lockUpdateStatement;
+ }
+
+ /**
+ * @param findDurableSubMessagesStatement the findDurableSubMessagesStatement to set
+ */
+ public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement){
+ this.findDurableSubMessagesStatement=findDurableSubMessagesStatement;
+ }
+
+ /**
+ * @param nextDurableSubscriberMessageStatement the nextDurableSubscriberMessageStatement to set
+ */
+ public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement){
+ this.nextDurableSubscriberMessageStatement=nextDurableSubscriberMessageStatement;
+ }
+
+
+ /**
+ * @param durableSubscriberMessageCountStatement the durableSubscriberMessageCountStatement to set
+ */
+ public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
+ this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Tue Sep 12 03:07:34 2006
@@ -408,6 +408,58 @@
}
}
+ public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, int maxReturned,JDBCMessageRecoveryListener listener) throws Exception {
+// dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
+
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+
+ s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
+ s.setLong(4,seq);
+ s.setInt(5,maxReturned);
+ rs = s.executeQuery();
+
+ if( statements.isUseExternalMessageReferences() ) {
+ while (rs.next()) {
+ listener.recoverMessageReference(rs.getString(2));
+ }
+ } else {
+ while (rs.next()) {
+ listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2));
+ }
+ }
+
+ }
+ finally {
+ close(rs);
+ close(s);
+ listener.finished();
+ }
+
+ }
+
+ public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
+ String subscriptionName) throws SQLException, IOException{
+ PreparedStatement s=null;
+ ResultSet rs=null;
+ int result = 0;
+ try{
+ s=c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
+ s.setString(1,destination.getQualifiedName());
+ s.setString(2,clientId);
+ s.setString(3,subscriptionName);
+ rs=s.executeQuery();
+ result = rs.getInt(1);
+ }finally{
+ close(rs);
+ close(s);
+ }
+ return result;
+ }
/**
* @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.apache.activemq.service.SubscriptionInfo)
@@ -607,6 +659,29 @@
public void setStatements(Statements statements) {
this.statements = statements;
+ }
+
+ public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException,IOException{
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+
+ s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriberName);
+ rs = s.executeQuery();
+
+ if (!rs.next()) {
+ return null;
+ }
+ return getBinaryData(rs, 1);
+
+ }
+ finally {
+ close(rs);
+ close(s);
+ }
}
/*
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Tue Sep 12 03:07:34 2006
@@ -604,11 +604,14 @@
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
- if (newPercentUsage > 80 && oldPercentUsage < newPercentUsage) {
- checkpoint(false, true);
+ newPercentUsage = ((newPercentUsage)/10)*10;
+ oldPercentUsage = ((oldPercentUsage)/10)*10;
+ if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
+ boolean sync = newPercentUsage >= 90;
+ checkpoint(sync, true);
}
}
-
+
public JournalTransactionStore getTransactionStore() {
return transactionStore;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -57,6 +57,12 @@
this.peristenceAdapter.checkpoint(true, true);
longTermStore.recoverSubscription(clientId, subscriptionName, listener);
}
+
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+ this.peristenceAdapter.checkpoint(true, true);
+ longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,listener);
+
+ }
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return longTermStore.lookupSubscription(clientId, subscriptionName);
@@ -183,5 +189,17 @@
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
+
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+ this.peristenceAdapter.checkpoint(true, true);
+ return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
+ }
+
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ this.peristenceAdapter.checkpoint(true, true);
+ return longTermStore.getMessageCount(clientId,subscriberName);
+ }
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -71,6 +71,25 @@
});
}
+
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId, int maxReturned,final MessageRecoveryListener listener) throws Exception{
+ this.peristenceAdapter.checkpoint(true, true);
+ longTermStore.recoverNextMessages(clientId, subscriptionName, lastMessageId,maxReturned,new MessageRecoveryListener() {
+ public void recoverMessage(Message message) throws Exception {
+ throw new IOException("Should not get called.");
+ }
+ public void recoverMessageReference(String messageReference) throws Exception {
+ RecordLocation loc = toRecordLocation(messageReference);
+ Message message = (Message) peristenceAdapter.readCommand(loc);
+ listener.recoverMessage(message);
+ }
+
+ public void finished(){
+ listener.finished();
+ }
+ });
+
+ }
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return longTermStore.lookupSubscription(clientId, subscriptionName);
@@ -197,5 +216,17 @@
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return longTermStore.getAllSubscriptions();
}
+
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+ this.peristenceAdapter.checkpoint(true, true);
+ return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
+ }
+
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ this.peristenceAdapter.checkpoint(true, true);
+ return longTermStore.getMessageCount(clientId,subscriberName);
+ }
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -148,6 +148,42 @@
listener.finished();
}
}
+
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+ MessageRecoveryListener listener) throws Exception{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ if(list!=null){
+ boolean startFound=false;
+ int count = 0;
+ for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
+ Object msg=messageContainer.get(i.next());
+ if(msg!=null){
+ if(msg.getClass()==String.class){
+ String ref=msg.toString();
+ if (startFound || lastMessageId == null){
+ listener.recoverMessageReference(ref);
+ count++;
+ }
+ else if(startFound||ref.equals(lastMessageId.toString())){
+ startFound=true;
+ }
+ }else{
+ Message message=(Message) msg;
+ if(startFound||message.getMessageId().equals(lastMessageId)){
+ startFound=true;
+ }else{
+ listener.recoverMessage(message);
+ count++;
+ }
+ }
+ }
+ listener.finished();
+ }
+ }else{
+ listener.finished();
+ }
+ }
public void delete(){
super.delete();
@@ -172,4 +208,20 @@
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
+
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ Iterator iter = list.iterator();
+ return (Message) (iter.hasNext() ? iter.next() : null);
+
+ }
+
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriberName);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ return list.size();
+ }
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -111,6 +111,40 @@
listener.finished();
}
}
+
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
+ MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+ boolean startFound=false;
+ // the message table is a synchronizedMap - so just have to synchronize here
+ synchronized(messageTable){
+ int count = 0;
+ for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() && count < maxReturned;){
+ Map.Entry entry=(Entry) iter.next();
+
+ Object msg=entry.getValue();
+ if(msg.getClass()==String.class){
+ String ref=msg.toString();
+ if(startFound||ref.equals(lastMessageId.toString())){
+ startFound=true;
+ }else if (startFound){
+ listener.recoverMessageReference(ref);
+ count++;
+ }
+ }else{
+ Message message=(Message) msg;
+ if(startFound||message.getMessageId().equals(lastMessageId)){
+ startFound=true;
+ }else if (startFound){
+ listener.recoverMessage(message);
+ count++;
+ }
+ }
+
+ }
+ listener.finished();
+ }
+
+ }
public void delete() {
super.delete();
@@ -122,4 +156,34 @@
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+ MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+ // the message table is a synchronizedMap - so just have to synchronize here
+ synchronized(messageTable){
+ for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+ Map.Entry entry=(Entry) iter.next();
+ if(entry.getKey().equals(lastAck)){
+ return (Message) entry.getValue();
+ }
+ }
+ }
+ return null;
+ }
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ int result = 0;
+ MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
+ // the message table is a synchronizedMap - so just have to synchronize here
+ synchronized(messageTable){
+ result = messageTable.size();
+ for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+ Map.Entry entry=(Entry) iter.next();
+ if(entry.getKey().equals(lastAck)){
+ break;
+ }
+ result--;
+ }
+ }
+ return result;
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Tue Sep 12 03:07:34 2006
@@ -94,6 +94,42 @@
}
}
+
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+ MessageRecoveryListener listener) throws Exception{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ if(list!=null){
+ boolean startFound=false;
+ int count = 0;
+ for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
+ Object msg=messageContainer.get(i.next());
+ if(msg!=null){
+ if(msg.getClass()==String.class){
+ String ref=msg.toString();
+ if (startFound || lastMessageId == null){
+ listener.recoverMessageReference(ref);
+ count++;
+ }
+ else if(startFound||ref.equals(lastMessageId.toString())){
+ startFound=true;
+ }
+ }else{
+ Message message=(Message) msg;
+ if(startFound||message.getMessageId().equals(lastMessageId)){
+ startFound=true;
+ }else{
+ listener.recoverMessage(message);
+ count++;
+ }
+ }
+ }
+ listener.finished();
+ }
+ }else{
+ listener.finished();
+ }
+ }
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
@@ -290,6 +326,20 @@
Marshaller marshaller=new StringMarshaller();
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
+ }
+
+ public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ Iterator iter = list.iterator();
+ return (Message) (iter.hasNext() ? iter.next() : null);
+
+ }
+
+ public int getMessageCount(String clientId,String subscriberName) throws IOException{
+ String key=getSubscriptionKey(clientId,subscriberName);
+ ListContainer list=(ListContainer) subscriberAcks.get(key);
+ return list.size();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties (original)
+++ incubator/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties Tue Sep 12 03:07:34 2006
@@ -1,26 +1,9 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-
#
# The logging properties used for eclipse testing, We want to see debug output on the console.
#
log4j.rootLogger=WARN, out
-log4j.logger.org.apache.activemq=DEBUG
+log4j.logger.org.apache.activemq=INFO
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java?view=diff&rev=442550&r1=442549&r2=442550
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java Tue Sep 12 03:07:34 2006
@@ -28,6 +28,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.DefaultPersistenceAdapterFactory;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
/**
* @version $Revision$
@@ -52,7 +53,14 @@
super.setUp();
broker=new BrokerService();
- broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+ //broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+ /*
+ DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
+ factory.setDataDirectoryFile(broker.getDataDirectory());
+ factory.setTaskRunnerFactory(broker.getTaskRunnerFactory());
+ factory.setUseJournal(false);
+ broker.setPersistenceFactory(factory);
+ */
broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
broker.start();
connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);