You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/06 11:25:50 UTC

svn commit: r515054 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: ./ amq/ jdbc/ journal/ kahadaptor/ memory/ quick/ rapid/

Author: rajdavies
Date: Tue Mar  6 02:25:48 2007
New Revision: 515054

URL: http://svn.apache.org/viewvc?view=rev&rev=515054
Log:
Deleted store implementations rapid and quick, as they are replaced by the AMQStore implementation

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java   (with props)
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Tue Mar  6 02:25:48 2007
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.memory.UsageManager;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 
@@ -38,22 +39,30 @@
      * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
      * objects that the persistence store is aware exist.
      *
-     * @return
+     * @return active destinations
      */
     public Set<ActiveMQDestination> getDestinations();
 
     /**
      * Factory method to create a new queue message store with the given destination name
+     * @param destination
+     * @return the message store
+     * @throws IOException 
      */
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
 
     /**
      * Factory method to create a new topic message store with the given destination name
+     * @param destination 
+     * @return the topic message store
+     * @throws IOException 
      */
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
 
     /**
      * Factory method to create a new persistent prepared transaction store for XA recovery
+     * @return transaction store
+     * @throws IOException 
      */
     public TransactionStore createTransactionStore() throws IOException;
 
@@ -66,27 +75,33 @@
      * real high performance its usually faster to perform many writes within the same
      * transaction to minimize latency caused by disk synchronization. This is especially
      * true when using tools like Berkeley Db or embedded JDBC servers.
+     * @param context 
+     * @throws IOException 
      */
     public void beginTransaction(ConnectionContext context) throws IOException;
 
 
     /**
      * Commit a persistence transaction
+     * @param context 
+     * @throws IOException 
      *
-     * @see PersistenceAdapter#beginTransaction()
+     * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
      */
     public void commitTransaction(ConnectionContext context) throws IOException;
 
     /**
      * Rollback a persistence transaction
+     * @param context 
+     * @throws IOException 
      *
-     * @see PersistenceAdapter#beginTransaction()
+     * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
      */
     public void rollbackTransaction(ConnectionContext context) throws IOException;
     
     /**
      * 
-     * @return
+     * @return last broker sequence
      * @throws IOException
      */
     public long getLastMessageBrokerSequenceId() throws IOException;
@@ -102,4 +117,24 @@
      * @param usageManager The UsageManager that is controlling the broker's memory usage.
      */
     public void setUsageManager(UsageManager usageManager);
+    
+    /**
+     * Set the name of the broker using the adapter
+     * @param brokerName
+     */
+    public void setBrokerName(String brokerName);
+    
+    /**
+     * Set the directory where any data files should be created
+     * @param dir
+     */
+    public void setDirectory(File dir);
+    
+    /**
+     * checkpoint any
+     * @param sync 
+     * @throws IOException 
+     *
+     */
+    public void checkpoint(boolean sync) throws IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactoryBean.java Tue Mar  6 02:25:48 2007
@@ -17,6 +17,7 @@
  */
 package org.apache.activemq.store;
 
+import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
 import org.springframework.beans.factory.FactoryBean;
 
 /**
@@ -26,7 +27,7 @@
  * 
  * @version $Revision: 1.1 $
  */
-public class PersistenceAdapterFactoryBean extends DefaultPersistenceAdapterFactory implements FactoryBean {
+public class PersistenceAdapterFactoryBean extends JournalPersistenceAdapterFactory implements FactoryBean {
 
     private PersistenceAdapter persistenceAdaptor;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Tue Mar  6 02:25:48 2007
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.amq;
 
 import java.io.File;
@@ -26,7 +23,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activeio.journal.Journal;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -54,7 +50,6 @@
 import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
 import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
-
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.Task;
@@ -68,261 +63,231 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * An implementation of {@link PersistenceAdapter} designed for use with a
- * {@link Journal} and then check pointing asynchronously on a timeout with some
- * other long term persistent storage.
+ * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
+ * asynchronously on a timeout with some other long term persistent storage.
  * 
  * @org.apache.xbean.XBean
  * 
  * @version $Revision: 1.17 $
  */
-public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener {
+public class AMQPersistenceAdapter implements PersistenceAdapter,UsageListener{
 
-    private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class);
-
-    private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
-    private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
-    
+    private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class);
+    private final ConcurrentHashMap<ActiveMQQueue,AMQMessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,AMQMessageStore>();
+    private final ConcurrentHashMap<ActiveMQTopic,AMQMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,AMQMessageStore>();
     private AsyncDataManager asyncDataManager;
-    private KahaReferenceStoreAdapter referenceStoreAdapter;
-	private TaskRunnerFactory taskRunnerFactory; 
-    private WireFormat wireFormat = new OpenWireFormat();
-
+    private ReferenceStoreAdapter referenceStoreAdapter;
+    private TaskRunnerFactory taskRunnerFactory;
+    private WireFormat wireFormat=new OpenWireFormat();
     private UsageManager usageManager;
-
-    private long cleanupInterval = 1000 * 60;
-    private long checkpointInterval = 1000 * 10;
-    
-    private int maxCheckpointWorkers = 1;
-    private int maxCheckpointMessageAddSize = 1024*4;
-
-    private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
-    
+    private long cleanupInterval=1000*60;
+    private long checkpointInterval=1000*10;
+    private int maxCheckpointWorkers=1;
+    private int maxCheckpointMessageAddSize=1024*4;
+    private AMQTransactionStore transactionStore=new AMQTransactionStore(this);
     private TaskRunner checkpointTask;
-    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
-    
-    private final AtomicBoolean started = new AtomicBoolean(false);
+    private CountDownLatch nextCheckpointCountDownLatch=new CountDownLatch(1);
+    private final AtomicBoolean started=new AtomicBoolean(false);
     private Runnable periodicCheckpointTask;
-
-	private Runnable periodicCleanupTask;
-	private boolean deleteAllMessages;
+    private Runnable periodicCleanupTask;
+    private boolean deleteAllMessages;
     private boolean syncOnWrite;
-    private String brokerName;
-	private File directory;
-   
+    private String brokerName="";
+    private File directory;
 
-     public AMQPersistenceAdapter() {
-            this("localhost");
+    public String getBrokerName(){
+        return this.brokerName;
+    }
+
+    public void setBrokerName(String brokerName){
+        this.brokerName=brokerName;
+        if(this.referenceStoreAdapter!=null){
+            this.referenceStoreAdapter.setBrokerName(brokerName);
         }
-    public AMQPersistenceAdapter(String brokerName) {
-        this.brokerName = brokerName;
-        this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName + "-amqstore");
     }
 
-    
-    public synchronized void start() throws Exception {
-        if( !started.compareAndSet(false, true) )
+    public synchronized void start() throws Exception{
+        if(!started.compareAndSet(false,true))
             return;
-        if (this.usageManager!=null) {
+        if(this.directory==null){
+            this.directory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
+        }
+        this.directory=new File(directory,"amqstore");
+        this.directory.mkdirs();
+        if(this.usageManager!=null){
             this.usageManager.addUsageListener(this);
         }
-
-        if( asyncDataManager == null ) {
-        	asyncDataManager = createAsyncDataManager();
+        if(asyncDataManager==null){
+            asyncDataManager=createAsyncDataManager();
         }
-        
-        if( referenceStoreAdapter==null ) {
-        	referenceStoreAdapter = createReferenceStoreAdapter();
+        if(referenceStoreAdapter==null){
+            referenceStoreAdapter=createReferenceStoreAdapter();
         }
+        referenceStoreAdapter.setDirectory(new File(directory,"kaha-reference-store"));
+        referenceStoreAdapter.setBrokerName(getBrokerName());
         referenceStoreAdapter.setUsageManager(usageManager);
-
-        if( taskRunnerFactory==null ) {
-        	taskRunnerFactory = createTaskRunnerFactory();
+        if(taskRunnerFactory==null){
+            taskRunnerFactory=createTaskRunnerFactory();
         }
-        
-    	asyncDataManager.start();    	
-    	if( deleteAllMessages ) {
-    		asyncDataManager.delete();
-	        try {
-	            JournalTrace trace = new JournalTrace();
-	            trace.setMessage("DELETED "+new Date());
-	            Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
-	            asyncDataManager.setMark(location, true);
-	            log.info("Journal deleted: ");
-	            deleteAllMessages=false;
-	        } catch (IOException e) {
-	            throw e;
-	        } catch (Throwable e) {
-	            throw IOExceptionSupport.create(e);
-	        }
-
-	        referenceStoreAdapter.deleteAllMessages();
+        asyncDataManager.start();
+        if(deleteAllMessages){
+            asyncDataManager.delete();
+            try{
+                JournalTrace trace=new JournalTrace();
+                trace.setMessage("DELETED "+new Date());
+                Location location=asyncDataManager.write(wireFormat.marshal(trace),false);
+                asyncDataManager.setMark(location,true);
+                log.info("Journal deleted: ");
+                deleteAllMessages=false;
+            }catch(IOException e){
+                throw e;
+            }catch(Throwable e){
+                throw IOExceptionSupport.create(e);
+            }
+            referenceStoreAdapter.deleteAllMessages();
         }
         referenceStoreAdapter.start();
-    	
-    	Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
-    	log.info("Active data files: "+files);
-        
-        checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
-            public boolean iterate() {
+        Set<Integer> files=referenceStoreAdapter.getReferenceFileIdsInUse();
+        log.info("Active data files: "+files);
+        checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
+
+            public boolean iterate(){
                 doCheckpoint();
                 return false;
             }
-        }, "ActiveMQ Journal Checkpoint Worker");
-                
+        },"ActiveMQ Journal Checkpoint Worker");
         createTransactionStore();
         recover();
-
         // Do a checkpoint periodically.
-        periodicCheckpointTask = new Runnable() {
-	        public void run() {
+        periodicCheckpointTask=new Runnable(){
+
+            public void run(){
                 checkpoint(false);
-	        }
-	    };	    
-        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
-        
-        periodicCleanupTask = new Runnable() {
-	        public void run() {
-	        	cleanup();
-	        }
-	    };
-        Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
+            }
+        };
+        Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval);
+        periodicCleanupTask=new Runnable(){
 
+            public void run(){
+                cleanup();
+            }
+        };
+        Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval);
     }
 
-
-	public void stop() throws Exception {
-        
-        if( !started.compareAndSet(true, false) )
+    public void stop() throws Exception{
+        if(!started.compareAndSet(true,false))
             return;
-        
-        this.usageManager.removeUsageListener(this);        
+        this.usageManager.removeUsageListener(this);
         Scheduler.cancel(periodicCheckpointTask);
         Scheduler.cancel(periodicCleanupTask);
-
-        
-        Iterator<AMQMessageStore> iterator = queues.values().iterator();
-        while (iterator.hasNext()) {
-            AMQMessageStore ms = iterator.next();
+        Iterator<AMQMessageStore> iterator=queues.values().iterator();
+        while(iterator.hasNext()){
+            AMQMessageStore ms=iterator.next();
             ms.stop();
         }
-
-        iterator = topics.values().iterator();
-        while (iterator.hasNext()) {
-            final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
+        iterator=topics.values().iterator();
+        while(iterator.hasNext()){
+            final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
             ms.stop();
         }
-        
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true);
-        checkpointTask.shutdown();   
-        
+        checkpointTask.shutdown();
         queues.clear();
         topics.clear();
-
-        IOException firstException = null;
+        IOException firstException=null;
         referenceStoreAdapter.stop();
-        try {
+        try{
             log.debug("Journal close");
             asyncDataManager.close();
-        } catch (Exception e) {
-            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
+        }catch(Exception e){
+            firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
         }
-        
-        if (firstException != null) {
+        if(firstException!=null){
             throw firstException;
         }
     }
-    
 
     /**
      * When we checkpoint we move all the journalled data to long term storage.
-     * @param stopping 
+     * 
+     * @param stopping
      * 
      * @param b
      */
-    public void checkpoint(boolean sync) {
-        try {
-            if (asyncDataManager == null )
+    public void checkpoint(boolean sync){
+        try{
+            if(asyncDataManager==null)
                 throw new IllegalStateException("Journal is closed.");
-            
-            CountDownLatch latch = null;
-            synchronized(this) {
-                latch = nextCheckpointCountDownLatch;
+            CountDownLatch latch=null;
+            synchronized(this){
+                latch=nextCheckpointCountDownLatch;
             }
-            
             checkpointTask.wakeup();
-            
-            if (sync) {
+            if(sync){
                 if(log.isDebugEnabled()){
                     log.debug("Waitng for checkpoint to complete.");
                 }
                 latch.await();
             }
-        }
-        catch (InterruptedException e) {
+            referenceStoreAdapter.checkpoint(sync);
+        }catch(InterruptedException e){
             Thread.currentThread().interrupt();
-            log.warn("Request to start checkpoint failed: " + e, e);
+            log.warn("Request to start checkpoint failed: "+e,e);
+        }catch(IOException e){
+            log.error("checkpoint failed: "+e,e);
         }
     }
-        
+
     /**
      * This does the actual checkpoint.
-     * @return 
+     * 
+     * @return
      */
-    public boolean doCheckpoint() {
-        CountDownLatch latch = null;
-        synchronized(this) {                       
-            latch = nextCheckpointCountDownLatch;
-            nextCheckpointCountDownLatch = new CountDownLatch(1);
-        }        
-        try {
-
+    public boolean doCheckpoint(){
+        CountDownLatch latch=null;
+        synchronized(this){
+            latch=nextCheckpointCountDownLatch;
+            nextCheckpointCountDownLatch=new CountDownLatch(1);
+        }
+        try{
             if(log.isDebugEnabled()){
                 log.debug("Checkpoint started.");
             }
-            referenceStoreAdapter.sync();
-            Location newMark = null;
-
-            Iterator<AMQMessageStore> iterator = queues.values().iterator();
-            while (iterator.hasNext()) {
-                final AMQMessageStore ms = iterator.next();
-                Location mark = (Location) ms.getMark();
-                if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
-                    newMark = mark;
+           
+            Location newMark=null;
+            Iterator<AMQMessageStore> iterator=queues.values().iterator();
+            while(iterator.hasNext()){
+                final AMQMessageStore ms=iterator.next();
+                Location mark=(Location)ms.getMark();
+                if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
+                    newMark=mark;
                 }
             }
-
-            iterator = topics.values().iterator();
-            while (iterator.hasNext()) {
-                final AMQTopicMessageStore ms = (AMQTopicMessageStore) iterator.next();
-                Location mark = (Location) ms.getMark();
-                if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
-                    newMark = mark;
+            iterator=topics.values().iterator();
+            while(iterator.hasNext()){
+                final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
+                Location mark=(Location)ms.getMark();
+                if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
+                    newMark=mark;
                 }
             }
-
-            try {
-                if (newMark != null) {
+            try{
+                if(newMark!=null){
                     if(log.isDebugEnabled()){
-                        log.debug("Marking journal at: " + newMark);
+                        log.debug("Marking journal at: "+newMark);
                     }
-                    asyncDataManager.setMark(newMark, false);
-                    writeTraceMessage("CHECKPOINT "+new Date(), true);
+                    asyncDataManager.setMark(newMark,false);
+                    writeTraceMessage("CHECKPOINT "+new Date(),true);
                 }
+            }catch(Exception e){
+                log.error("Failed to mark the Journal: "+e,e);
             }
-            catch (Exception e) {
-                log.error("Failed to mark the Journal: " + e, e);
-            }
-    
             if(log.isDebugEnabled()){
                 log.debug("Checkpoint done.");
             }
-        }
-        catch(IOException e) {
-            log.error("Failed to sync reference store",e);
-        }
-        finally {
+        }finally{
             latch.countDown();
         }
         return true;
@@ -330,197 +295,183 @@
 
     /**
      * Cleans up the data files
-     * @return 
-     * @throws IOException 
+     * 
+     * @return
+     * @throws IOException
      */
-    public void cleanup() {
-    	
-    	try {
-    		Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
-			asyncDataManager.consolidateDataFilesNotIn(inUse);
-		} catch (IOException e) {
-            log.error("Could not cleanup data files: "+e, e);
-		}
-    	
+    public void cleanup(){
+        try{
+            Set<Integer> inUse=referenceStoreAdapter.getReferenceFileIdsInUse();
+            asyncDataManager.consolidateDataFilesNotIn(inUse);
+        }catch(IOException e){
+            log.error("Could not cleanup data files: "+e,e);
+        }
     }
-    
 
-    public Set<ActiveMQDestination> getDestinations() {
-        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
+    public Set<ActiveMQDestination> getDestinations(){
+        Set<ActiveMQDestination> destinations=new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
         destinations.addAll(queues.keySet());
         destinations.addAll(topics.keySet());
         return destinations;
     }
 
-    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
-        if (destination.isQueue()) {
-            return createQueueMessageStore((ActiveMQQueue) destination);
-        }
-        else {
-            return createTopicMessageStore((ActiveMQTopic) destination);
+    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException{
+        if(destination.isQueue()){
+            return createQueueMessageStore((ActiveMQQueue)destination);
+        }else{
+            return createTopicMessageStore((ActiveMQTopic)destination);
         }
     }
 
-    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        AMQMessageStore store = queues.get(destination);
-        if (store == null) {
-        	ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
-            store = new AMQMessageStore(this, checkpointStore, destination);
-            try {
-				store.start();
-			} catch (Exception e) {
-				throw IOExceptionSupport.create(e);
-			}
-            queues.put(destination, store);
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
+        AMQMessageStore store=queues.get(destination);
+        if(store==null){
+            ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination);
+            store=new AMQMessageStore(this,checkpointStore,destination);
+            try{
+                store.start();
+            }catch(Exception e){
+                throw IOExceptionSupport.create(e);
+            }
+            queues.put(destination,store);
         }
         return store;
     }
 
-    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
-        AMQTopicMessageStore store = (AMQTopicMessageStore) topics.get(destinationName);
-        if (store == null) {
-        	TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
-            store = new AMQTopicMessageStore(this, checkpointStore, destinationName);
-            try {
-				store.start();
-			} catch (Exception e) {
-				throw IOExceptionSupport.create(e);
-			}
-            topics.put(destinationName, store);
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException{
+        AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName);
+        if(store==null){
+            TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName);
+            store=new AMQTopicMessageStore(this,checkpointStore,destinationName);
+            try{
+                store.start();
+            }catch(Exception e){
+                throw IOExceptionSupport.create(e);
+            }
+            topics.put(destinationName,store);
         }
         return store;
     }
 
-    public TransactionStore createTransactionStore() throws IOException {
+    public TransactionStore createTransactionStore() throws IOException{
         return transactionStore;
     }
 
-    public long getLastMessageBrokerSequenceId() throws IOException {
+    public long getLastMessageBrokerSequenceId() throws IOException{
         return referenceStoreAdapter.getLastMessageBrokerSequenceId();
     }
 
-    public void beginTransaction(ConnectionContext context) throws IOException {
+    public void beginTransaction(ConnectionContext context) throws IOException{
         referenceStoreAdapter.beginTransaction(context);
     }
 
-    public void commitTransaction(ConnectionContext context) throws IOException {
+    public void commitTransaction(ConnectionContext context) throws IOException{
         referenceStoreAdapter.commitTransaction(context);
     }
 
-    public void rollbackTransaction(ConnectionContext context) throws IOException {
+    public void rollbackTransaction(ConnectionContext context) throws IOException{
         referenceStoreAdapter.rollbackTransaction(context);
     }
 
-
     /**
      * @param location
      * @return
      * @throws IOException
      */
-    public DataStructure readCommand(Location location) throws IOException {
-        try {
-        	ByteSequence packet = asyncDataManager.read(location);
-            return (DataStructure) wireFormat.unmarshal(packet);
-        } catch (IOException e) {
-            throw createReadException(location, e);
+    public DataStructure readCommand(Location location) throws IOException{
+        try{
+            ByteSequence packet=asyncDataManager.read(location);
+            return (DataStructure)wireFormat.unmarshal(packet);
+        }catch(IOException e){
+            throw createReadException(location,e);
         }
     }
 
     /**
-     * Move all the messages that were in the journal into long term storage. We
-     * just replay and do a checkpoint.
+     * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
      * 
      * @throws IOException
      * @throws IOException
      * @throws InvalidLocationException
      * @throws IllegalStateException
      */
-    private void recover() throws IllegalStateException, IOException {
-
-        Location pos = null;
-        int redoCounter = 0;
-
-        log.info("Journal Recovery Started from: " + asyncDataManager);
-        long start = System.currentTimeMillis();
-        ConnectionContext context = new ConnectionContext();
-
+    private void recover() throws IllegalStateException,IOException{
+        Location pos=null;
+        int redoCounter=0;
+        log.info("Journal Recovery Started from: "+asyncDataManager);
+        long start=System.currentTimeMillis();
+        ConnectionContext context=new ConnectionContext();
         // While we have records in the journal.
-        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
-            ByteSequence data = asyncDataManager.read(pos);
-            DataStructure c = (DataStructure) wireFormat.unmarshal(data);
-
-            if (c instanceof Message ) {
-                Message message = (Message) c;
-                AMQMessageStore store = (AMQMessageStore) createMessageStore(message.getDestination());
-                if ( message.isInTransaction()) {
-                    transactionStore.addMessage(store, message, pos);
-                }
-                else {
-                    if( store.replayAddMessage(context, message, pos) ) {
-                    	redoCounter++;
+        while((pos=asyncDataManager.getNextLocation(pos))!=null){
+            ByteSequence data=asyncDataManager.read(pos);
+            DataStructure c=(DataStructure)wireFormat.unmarshal(data);
+            if(c instanceof Message){
+                Message message=(Message)c;
+                AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination());
+                if(message.isInTransaction()){
+                    transactionStore.addMessage(store,message,pos);
+                }else{
+                    if(store.replayAddMessage(context,message,pos)){
+                        redoCounter++;
                     }
                 }
-            } else {
-                switch (c.getDataStructureType()) {
-                case JournalQueueAck.DATA_STRUCTURE_TYPE:
-                {
-                    JournalQueueAck command = (JournalQueueAck) c;
-                    AMQMessageStore store = (AMQMessageStore) createMessageStore(command.getDestination());
-                    if (command.getMessageAck().isInTransaction()) {
-                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
-                    }
-                    else {
-                        if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
-                        	redoCounter++;
+            }else{
+                switch(c.getDataStructureType()){
+                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
+                    JournalQueueAck command=(JournalQueueAck)c;
+                    AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination());
+                    if(command.getMessageAck().isInTransaction()){
+                        transactionStore.removeMessage(store,command.getMessageAck(),pos);
+                    }else{
+                        if(store.replayRemoveMessage(context,command.getMessageAck())){
+                            redoCounter++;
                         }
                     }
                 }
-                break;
-                case JournalTopicAck.DATA_STRUCTURE_TYPE: 
-                {
-                    JournalTopicAck command = (JournalTopicAck) c;
-                    AMQTopicMessageStore store = (AMQTopicMessageStore) createMessageStore(command.getDestination());
-                    if (command.getTransactionId() != null) {
-                        transactionStore.acknowledge(store, command, pos);
-                    }
-                    else {
-                        if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
-                        	redoCounter++;
+                    break;
+                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
+                    JournalTopicAck command=(JournalTopicAck)c;
+                    AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination());
+                    if(command.getTransactionId()!=null){
+                        transactionStore.acknowledge(store,command,pos);
+                    }else{
+                        if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
+                                .getMessageId())){
+                            redoCounter++;
                         }
                     }
                 }
-                break;
-                case JournalTransaction.DATA_STRUCTURE_TYPE:
-                {
-                    JournalTransaction command = (JournalTransaction) c;
-                    try {
+                    break;
+                case JournalTransaction.DATA_STRUCTURE_TYPE: {
+                    JournalTransaction command=(JournalTransaction)c;
+                    try{
                         // Try to replay the packet.
-                        switch (command.getType()) {
+                        switch(command.getType()){
                         case JournalTransaction.XA_PREPARE:
                             transactionStore.replayPrepare(command.getTransactionId());
                             break;
                         case JournalTransaction.XA_COMMIT:
                         case JournalTransaction.LOCAL_COMMIT:
-                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
-                            if (tx == null)
+                            Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
+                            if(tx==null)
                                 break; // We may be trying to replay a commit that
-                                        // was already committed.
-
+                            // was already committed.
                             // Replay the committed operations.
                             tx.getOperations();
-                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
-                                TxOperation op = (TxOperation) iter.next();
-                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
-                                    if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
+                            for(Iterator iter=tx.getOperations().iterator();iter.hasNext();){
+                                TxOperation op=(TxOperation)iter.next();
+                                if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
+                                    if(op.store.replayAddMessage(context,(Message)op.data,op.location))
                                         redoCounter++;
                                 }
-                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
-                                    if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
+                                if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
+                                    if(op.store.replayRemoveMessage(context,(MessageAck)op.data))
                                         redoCounter++;
                                 }
-                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
-                                    JournalTopicAck ack = (JournalTopicAck) op.data;
-                                    if( ((AMQTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
+                                if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
+                                    JournalTopicAck ack=(JournalTopicAck)op.data;
+                                    if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
+                                            .getSubscritionName(),ack.getMessageId())){
                                         redoCounter++;
                                     }
                                 }
@@ -531,42 +482,40 @@
                             transactionStore.replayRollback(command.getTransactionId());
                             break;
                         }
-                    }
-                    catch (IOException e) {
-                        log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+                    }catch(IOException e){
+                        log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
                     }
                 }
-                break;
+                    break;
                 case JournalTrace.DATA_STRUCTURE_TYPE:
-                    JournalTrace trace = (JournalTrace) c;
-                    log.debug("TRACE Entry: " + trace.getMessage());
+                    JournalTrace trace=(JournalTrace)c;
+                    log.debug("TRACE Entry: "+trace.getMessage());
                     break;
                 default:
-                    log.error("Unknown type of record in transaction log which will be discarded: " + c);
+                    log.error("Unknown type of record in transaction log which will be discarded: "+c);
                 }
             }
         }
-        Location location = writeTraceMessage("RECOVERED "+new Date(), true);
-        asyncDataManager.setMark(location, true);
-        long end = System.currentTimeMillis();
-
-        log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
+        Location location=writeTraceMessage("RECOVERED "+new Date(),true);
+        asyncDataManager.setMark(location,true);
+        long end=System.currentTimeMillis();
+        log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds.");
     }
 
-    private IOException createReadException(Location location, Exception e) {
-        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
+    private IOException createReadException(Location location,Exception e){
+        return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
     }
 
-    protected IOException createWriteException(DataStructure packet, Exception e) {
-        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
+    protected IOException createWriteException(DataStructure packet,Exception e){
+        return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
     }
 
-    protected IOException createWriteException(String command, Exception e) {
-        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
+    protected IOException createWriteException(String command,Exception e){
+        return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
     }
 
-    protected IOException createRecoveryFailedException(Exception e) {
-        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
+    protected IOException createRecoveryFailedException(Exception e){
+        return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
     }
 
     /**
@@ -576,118 +525,119 @@
      * @return
      * @throws IOException
      */
-    public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
-        return asyncDataManager.write(wireFormat.marshal(command), (syncHint && syncOnWrite));
+    public Location writeCommand(DataStructure command,boolean syncHint) throws IOException{
+        return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite));
     }
 
-    private Location writeTraceMessage(String message, boolean sync) throws IOException {
-        JournalTrace trace = new JournalTrace();
+    private Location writeTraceMessage(String message,boolean sync) throws IOException{
+        JournalTrace trace=new JournalTrace();
         trace.setMessage(message);
-        return writeCommand(trace, sync);
+        return writeCommand(trace,sync);
     }
 
-    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
-        newPercentUsage = ((newPercentUsage)/10)*10;
-        oldPercentUsage = ((oldPercentUsage)/10)*10;
-        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
+    public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
+        newPercentUsage=((newPercentUsage)/10)*10;
+        oldPercentUsage=((oldPercentUsage)/10)*10;
+        if(newPercentUsage>=70&&oldPercentUsage<newPercentUsage){
             checkpoint(false);
         }
     }
-    
-    public AMQTransactionStore getTransactionStore() {
+
+    public AMQTransactionStore getTransactionStore(){
         return transactionStore;
     }
 
-    public void deleteAllMessages() throws IOException {
-    	deleteAllMessages=true;
+    public void deleteAllMessages() throws IOException{
+        deleteAllMessages=true;
     }
 
-
-
     public String toString(){
-        return "AMQPersistenceAdapter(" + directory + ")";
+        return "AMQPersistenceAdapter("+directory+")";
     }
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Subclass overridables
-    ///////////////////////////////////////////////////////////////////
-    protected AsyncDataManager createAsyncDataManager() {
-    	AsyncDataManager manager = new AsyncDataManager();
-    	manager.setDirectory(new File(directory, "journal"));
-		return manager;
-	}
-    
-    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
-    	KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory); 
-		return adaptor;
-	}
-
-    protected TaskRunnerFactory createTaskRunnerFactory() {
-		return DefaultThreadPools.getDefaultTaskRunnerFactory();
-	}
+    // /////////////////////////////////////////////////////////////////
+    protected AsyncDataManager createAsyncDataManager(){
+        AsyncDataManager manager=new AsyncDataManager();
+        manager.setDirectory(new File(directory,"journal"));
+        return manager;
+    }
 
+    protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException{
+        KahaReferenceStoreAdapter adaptor=new KahaReferenceStoreAdapter();
+        return adaptor;
+    }
+
+    protected TaskRunnerFactory createTaskRunnerFactory(){
+        return DefaultThreadPools.getDefaultTaskRunnerFactory();
+    }
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Property Accessors
-    ///////////////////////////////////////////////////////////////////
-    
-	public AsyncDataManager getAsyncDataManager() {
-		return asyncDataManager;
-	}
-	public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
-		this.asyncDataManager = asyncDataManager;
-	}
-
-	public ReferenceStoreAdapter getReferenceStoreAdapter() {
-		return referenceStoreAdapter;
-	}
-	
-
-	public TaskRunnerFactory getTaskRunnerFactory() {
-		return taskRunnerFactory;
-	}
-	public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
-		this.taskRunnerFactory = taskRunnerFactory;
-	}
+    // /////////////////////////////////////////////////////////////////
+    public AsyncDataManager getAsyncDataManager(){
+        return asyncDataManager;
+    }
+
+    public void setAsyncDataManager(AsyncDataManager asyncDataManager){
+        this.asyncDataManager=asyncDataManager;
+    }
+
+    public ReferenceStoreAdapter getReferenceStoreAdapter(){
+        return referenceStoreAdapter;
+    }
+
+    public TaskRunnerFactory getTaskRunnerFactory(){
+        return taskRunnerFactory;
+    }
+
+    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
+        this.taskRunnerFactory=taskRunnerFactory;
+    }
 
     /**
      * @return Returns the wireFormat.
      */
-    public WireFormat getWireFormat() {
+    public WireFormat getWireFormat(){
         return wireFormat;
     }
-	public void setWireFormat(WireFormat wireFormat) {
-		this.wireFormat = wireFormat;
-	}
 
-    public UsageManager getUsageManager() {
+    public void setWireFormat(WireFormat wireFormat){
+        this.wireFormat=wireFormat;
+    }
+
+    public UsageManager getUsageManager(){
         return usageManager;
     }
-    public void setUsageManager(UsageManager usageManager) {
-        this.usageManager = usageManager;
+
+    public void setUsageManager(UsageManager usageManager){
+        this.usageManager=usageManager;
     }
 
-    public int getMaxCheckpointMessageAddSize() {
+    public int getMaxCheckpointMessageAddSize(){
         return maxCheckpointMessageAddSize;
     }
-    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
-        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
+
+    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
+        this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
     }
 
-    public int getMaxCheckpointWorkers() {
+    public int getMaxCheckpointWorkers(){
         return maxCheckpointWorkers;
     }
-    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
-        this.maxCheckpointWorkers = maxCheckpointWorkers;
+
+    public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
+        this.maxCheckpointWorkers=maxCheckpointWorkers;
     }
 
-	public File getDirectory() {
-		return directory;
-	}
+    public File getDirectory(){
+        return directory;
+    }
 
-	public void setDirectory(File directory) {
-		this.directory = directory;
-	}
+    public void setDirectory(File directory){
+        this.directory=directory;
+    }
 
     public boolean isSyncOnWrite(){
         return this.syncOnWrite;
@@ -696,7 +646,4 @@
     public void setSyncOnWrite(boolean syncOnWrite){
         this.syncOnWrite=syncOnWrite;
     }
-    
-
-
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?view=auto&rev=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Tue Mar  6 02:25:48 2007
@@ -0,0 +1,114 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import java.io.File;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterFactory;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * An implementation of {@link PersistenceAdapterFactory}
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 1.17 $
+ */
+public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory{
+
+    private TaskRunnerFactory taskRunnerFactory;
+    private File dataDirectory;
+    private int journalThreadPriority = Thread.MAX_PRIORITY;
+    private String brokerName="localhost";
+    
+    /**
+     * @return a AMQPersistenceAdapter
+     * @see org.apache.activemq.store.PersistenceAdapterFactory#createPersistenceAdapter()
+     */
+    public PersistenceAdapter createPersistenceAdapter(){
+        AMQPersistenceAdapter result =  new AMQPersistenceAdapter();
+        result.setDirectory(getDataDirectory());
+        result.setTaskRunnerFactory(getTaskRunnerFactory());
+        result.setBrokerName(getBrokerName());
+        return result;
+    }
+    
+    /**
+     * @return the dataDirectory
+     */
+    public File getDataDirectory(){
+        if(this.dataDirectory==null){
+            this.dataDirectory=new File(IOHelper.getDefaultDataDirectory(),brokerName);
+        }
+        return this.dataDirectory;
+    }
+    
+    /**
+     * @param dataDirectory the dataDirectory to set
+     */
+    public void setDataDirectory(File dataDirectory){
+        this.dataDirectory=dataDirectory;
+    }
+    
+    /**
+     * @return the taskRunnerFactory
+     */
+    public TaskRunnerFactory getTaskRunnerFactory(){
+        if( taskRunnerFactory == null ) {
+            taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, true, 1000);
+        }
+        return taskRunnerFactory;
+    }
+    
+    /**
+     * @param taskRunnerFactory the taskRunnerFactory to set
+     */
+    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
+        this.taskRunnerFactory=taskRunnerFactory;
+    }
+
+    
+    /**
+     * @return the journalThreadPriority
+     */
+    public int getJournalThreadPriority(){
+        return this.journalThreadPriority;
+    }
+
+    
+    /**
+     * @param journalThreadPriority the journalThreadPriority to set
+     */
+    public void setJournalThreadPriority(int journalThreadPriority){
+        this.journalThreadPriority=journalThreadPriority;
+    }
+
+    
+    /**
+     * @return the brokerName
+     */
+    public String getBrokerName(){
+        return this.brokerName;
+    }
+
+    
+    /**
+     * @param brokerName the brokerName to set
+     */
+    public void setBrokerName(String brokerName){
+        this.brokerName=brokerName;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Tue Mar  6 02:25:48 2007
@@ -43,6 +43,7 @@
 
 import javax.sql.DataSource;
 
+import java.io.File;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collections;
@@ -483,7 +484,16 @@
         return new DefaultDatabaseLocker(getDataSource(), getStatements());
     }
     
+    public void setBrokerName(String brokerName){
+    }
+    
     public String toString(){
         return "JDBCPersistenceAdaptor("+super.toString()+")";
+    }
+
+    public void setDirectory(File dir){        
+    }
+
+    public void checkpoint(boolean sync) throws IOException{        
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Tue Mar  6 02:25:48 2007
@@ -17,6 +17,7 @@
  */
 package org.apache.activemq.store.journal;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -314,6 +315,10 @@
             log.warn("Request to start checkpoint failed: " + e, e);
         }
     }
+    
+    public void checkpoint(boolean sync) {
+        checkpoint(sync,sync);
+    }
         
     /**
      * This does the actual checkpoint.
@@ -666,8 +671,15 @@
     	return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
     }
     
+    public void setBrokerName(String brokerName){
+        longTermPersistence.setBrokerName(brokerName);
+    }
+    
     public String toString(){
         return "JournalPersistenceAdapator(" + longTermPersistence + ")";
+    }
+
+    public void setDirectory(File dir){        
     }
 
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?view=auto&rev=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Tue Mar  6 02:25:48 2007
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.journal;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activeio.journal.active.JournalLockedException;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterFactory;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.JDBCAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.Statements;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Factory class that can create PersistenceAdapter objects.
+ *
+ * @version $Revision: 1.4 $
+ */
+public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
+    
+    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000;
+
+    private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class);
+    
+    private int journalLogFileSize = 1024*1024*20;
+    private int journalLogFiles = 2;
+    private TaskRunnerFactory taskRunnerFactory;
+    private Journal journal;
+    private boolean useJournal=true;
+    private boolean useQuickJournal=false;
+    private File journalArchiveDirectory;
+    private boolean failIfJournalIsLocked=false;
+    private int journalThreadPriority = Thread.MAX_PRIORITY;
+    private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+    
+    public PersistenceAdapter createPersistenceAdapter() throws IOException {
+        jdbcPersistenceAdapter.setDataSource(getDataSource());
+        
+        if( !useJournal ) {
+            return jdbcPersistenceAdapter;
+        }
+        return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
+        
+    }
+
+    public int getJournalLogFiles() {
+        return journalLogFiles;
+    }
+
+    /**
+     * Sets the number of journal log files to use
+     */
+    public void setJournalLogFiles(int journalLogFiles) {
+        this.journalLogFiles = journalLogFiles;
+    }
+
+    public int getJournalLogFileSize() {
+        return journalLogFileSize;
+    }
+
+    /**
+     * Sets the size of the journal log files
+     *
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
+     */
+    public void setJournalLogFileSize(int journalLogFileSize) {
+        this.journalLogFileSize = journalLogFileSize;
+    }
+    
+    public JDBCPersistenceAdapter getJdbcAdapter() {
+        return jdbcPersistenceAdapter;
+    }
+
+    public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
+        this.jdbcPersistenceAdapter = jdbcAdapter;
+    }
+
+    public boolean isUseJournal() {
+        return useJournal;
+    }
+
+    /**
+     * Enables or disables the use of the journal. The default is to use the journal
+     *
+     * @param useJournal
+     */
+    public void setUseJournal(boolean useJournal) {
+        this.useJournal = useJournal;
+    }
+
+    public TaskRunnerFactory getTaskRunnerFactory() {
+        if( taskRunnerFactory == null ) {
+            taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000);
+        }
+        return taskRunnerFactory;
+    }
+
+    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
+        this.taskRunnerFactory = taskRunnerFactory;
+    }
+
+    public Journal getJournal() throws IOException {
+        if( journal == null ) {
+            createJournal();
+        }
+        return journal;
+    }
+
+    public void setJournal(Journal journal) {
+        this.journal = journal;
+    }
+
+    public File getJournalArchiveDirectory() {
+        if( journalArchiveDirectory == null && useQuickJournal ) {
+            journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
+        }
+        return journalArchiveDirectory;
+    }
+
+    public void setJournalArchiveDirectory(File journalArchiveDirectory) {
+        this.journalArchiveDirectory = journalArchiveDirectory;
+    }
+
+
+    public boolean isUseQuickJournal() {
+        return useQuickJournal;
+    }
+
+    /**
+     * Enables or disables the use of quick journal, which keeps messages in the journal and just
+     * stores a reference to the messages in JDBC. Defaults to false so that messages actually reside
+     * long term in the JDBC database.
+     */
+    public void setUseQuickJournal(boolean useQuickJournal) {
+        this.useQuickJournal = useQuickJournal;
+    }
+
+    public JDBCAdapter getAdapter() throws IOException {
+        return jdbcPersistenceAdapter.getAdapter();
+    }
+
+    public void setAdapter(JDBCAdapter adapter) {
+        jdbcPersistenceAdapter.setAdapter(adapter);
+    }
+
+    public Statements getStatements() {
+        return jdbcPersistenceAdapter.getStatements();
+    }
+    public void setStatements(Statements statements) {
+        jdbcPersistenceAdapter.setStatements(statements);
+    }
+
+    public boolean isUseDatabaseLock() {
+        return jdbcPersistenceAdapter.isUseDatabaseLock();
+    }
+
+    /**
+     * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
+     */
+    public void setUseDatabaseLock(boolean useDatabaseLock) {
+        jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
+    }
+
+    public boolean isCreateTablesOnStartup() {
+        return jdbcPersistenceAdapter.isCreateTablesOnStartup();
+    }
+
+    /**
+     * Sets whether or not tables are created on startup
+     */
+    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
+        jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
+    }
+    
+    public int getJournalThreadPriority(){
+        return journalThreadPriority;
+    }
+
+    /**
+     * Sets the thread priority of the journal thread
+     */
+    public void setJournalThreadPriority(int journalThreadPriority){
+        this.journalThreadPriority=journalThreadPriority;
+    }
+
+    /**
+     * @throws IOException
+     */
+    protected void createJournal() throws IOException {
+        File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
+        if( failIfJournalIsLocked ) {
+            journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+        } else {
+            while( true ) {
+                try {
+                    journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+                    break;
+                } catch (JournalLockedException e) {
+                    log.info("Journal is locked... waiting "+(JOURNAL_LOCKED_WAIT_DELAY/1000)+" seconds for the journal to be unlocked.");
+                    try {
+                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+                    } catch (InterruptedException e1) {
+                    }
+                }
+            }
+        }
+    }
+
+    
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Tue Mar  6 02:25:48 2007
@@ -74,10 +74,15 @@
         removeMessage(ack.getLastMessageId());
     }
 
+    
+    
     public synchronized void removeMessage(MessageId msgId) throws IOException{
-        messageContainer.remove(msgId);
-        if(messageContainer.isEmpty()){
-            resetBatching();
+        StoreEntry entry=messageContainer.getEntry(msgId);
+        if(entry!=null){
+            messageContainer.remove(entry);
+            if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+                resetBatching();
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Tue Mar  6 02:25:48 2007
@@ -41,38 +41,30 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 /**
  * @org.apache.xbean.XBean
  * 
  * @version $Revision: 1.4 $
  */
 public class KahaPersistenceAdapter implements PersistenceAdapter{
-    private static final int STORE_LOCKED_WAIT_DELAY = 10*1000;
+
+    private static final int STORE_LOCKED_WAIT_DELAY=10*1000;
     private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
     static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
     KahaTransactionStore transactionStore;
-    ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
-    ConcurrentHashMap<ActiveMQQueue, MessageStore> queues=new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
-    ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
+    ConcurrentHashMap<ActiveMQTopic,TopicMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,TopicMessageStore>();
+    ConcurrentHashMap<ActiveMQQueue,MessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,MessageStore>();
+    ConcurrentHashMap<ActiveMQDestination,MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination,MessageStore>();
     protected OpenWireFormat wireFormat=new OpenWireFormat();
     private long maxDataFileLength=32*1024*1024;
-    
-   
-    private File dir;
+    private File directory;
+    private String brokerName;
     private Store theStore;
-
-    public KahaPersistenceAdapter(File dir) throws IOException{
-        if(!dir.exists()){
-            dir.mkdirs();
-        }
-        this.dir=dir;
-        wireFormat.setCacheEnabled(false);
-        wireFormat.setTightEncodingEnabled(true);
-    }
+    private boolean initialized;
 
     public Set<ActiveMQDestination> getDestinations(){
         Set<ActiveMQDestination> rc=new HashSet<ActiveMQDestination>();
@@ -81,7 +73,7 @@
             for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
                 Object obj=i.next();
                 if(obj instanceof ActiveMQDestination){
-                    rc.add((ActiveMQDestination) obj);
+                    rc.add((ActiveMQDestination)obj);
                 }
             }
         }catch(IOException e){
@@ -127,25 +119,25 @@
     }
 
     public TransactionStore createTransactionStore() throws IOException{
-       
         if(transactionStore==null){
-            while (true) {
-                try {
-            Store store=getStore();
-            MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
-            container.setKeyMarshaller(new CommandMarshaller(wireFormat));
-            container.setValueMarshaller(new TransactionMarshaller(wireFormat));
-            container.load();
-            transactionStore=new KahaTransactionStore(this,container);
-            break;
-                }catch(StoreLockedExcpetion e) {
-                    log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)+" seconds for the Store to be unlocked.");
+            while(true){
+                try{
+                    Store store=getStore();
+                    MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
+                    container.setKeyMarshaller(new CommandMarshaller(wireFormat));
+                    container.setValueMarshaller(new TransactionMarshaller(wireFormat));
+                    container.load();
+                    transactionStore=new KahaTransactionStore(this,container);
+                    break;
+                }catch(StoreLockedExcpetion e){
+                    log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)
+                            +" seconds for the Store to be unlocked.");
                     try{
                         Thread.sleep(STORE_LOCKED_WAIT_DELAY);
                     }catch(InterruptedException e1){
                     }
                 }
-        }
+            }
         }
         return transactionStore;
     }
@@ -163,6 +155,7 @@
     }
 
     public void start() throws Exception{
+        initialize();
     }
 
     public void stop() throws Exception{
@@ -182,37 +175,37 @@
             }else{
                 theStore.delete();
             }
-        }else {
+        }else{
             StoreFactory.delete(getStoreName());
         }
     }
 
     protected MapContainer<MessageId,Message> getMapContainer(Object id,String containerName) throws IOException{
         Store store=getStore();
-        MapContainer<MessageId, Message> container=store.getMapContainer(id,containerName);
+        MapContainer<MessageId,Message> container=store.getMapContainer(id,containerName);
         container.setKeyMarshaller(new MessageIdMarshaller());
-        container.setValueMarshaller(new MessageMarshaller(wireFormat));        
+        container.setValueMarshaller(new MessageMarshaller(wireFormat));
         container.load();
         return container;
     }
-    
+
     protected MapContainer<String,Object> getSubsMapContainer(Object id,String containerName) throws IOException{
         Store store=getStore();
-        MapContainer<String, Object> container=store.getMapContainer(id,containerName);
+        MapContainer<String,Object> container=store.getMapContainer(id,containerName);
         container.setKeyMarshaller(Store.StringMarshaller);
-        container.setValueMarshaller(createMessageMarshaller());        
+        container.setValueMarshaller(createMessageMarshaller());
         container.load();
         return container;
     }
 
-    protected Marshaller<Object> createMessageMarshaller() {
-		return new CommandMarshaller(wireFormat);
-	}
+    protected Marshaller<Object> createMessageMarshaller(){
+        return new CommandMarshaller(wireFormat);
+    }
 
-	protected ListContainer getListContainer(Object id,String containerName) throws IOException{
+    protected ListContainer getListContainer(Object id,String containerName) throws IOException{
         Store store=getStore();
         ListContainer container=store.getListContainer(id,containerName);
-        container.setMarshaller(createMessageMarshaller());        
+        container.setMarshaller(createMessageMarshaller());
         container.load();
         return container;
     }
@@ -239,8 +232,6 @@
         this.maxDataFileLength=maxDataFileLength;
     }
 
-      
-
     protected synchronized Store getStore() throws IOException{
         if(theStore==null){
             theStore=StoreFactory.open(getStoreName(),"rw");
@@ -248,13 +239,50 @@
         }
         return theStore;
     }
-    
+
     private String getStoreName(){
-        String name=dir.getAbsolutePath()+File.separator+"kaha.db";
-        return name;
+        initialize();
+        return directory.getAbsolutePath();
     }
-    
+
     public String toString(){
-        return "KahaPersistenceAdapter(" + getStoreName() +")";
+        return "KahaPersistenceAdapter("+getStoreName()+")";
+    }
+
+    public void setBrokerName(String brokerName){
+        this.brokerName=brokerName;
+    }
+    
+    public String getBrokerName(){
+        return brokerName;
+    }
+
+    public File getDirectory(){
+        return this.directory;
+    }
+
+    public void setDirectory(File directory){
+        this.directory=directory;
+    }
+  
+    public void checkpoint(boolean sync) throws IOException{
+        if(sync){
+            getStore().force();
+        }
+    }
+
+    private void initialize(){
+        if(!initialized){
+            initialized=true;
+            if(this.directory==null){
+                this.directory=new File(IOHelper.getDefaultDataDirectory());
+                this.directory=new File(this.directory,brokerName+"-kahastore");
+            }         
+            this.directory.mkdirs();
+            wireFormat.setCacheEnabled(false);
+            wireFormat.setTightEncodingEnabled(true);
+        }
     }
+
+   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Tue Mar  6 02:25:48 2007
@@ -77,7 +77,9 @@
             entry=messageContainer.getFirst();
         }else{
             entry=messageContainer.refresh(entry);
+            if (entry != null) {
             entry=messageContainer.getNext(entry);
+            }
         }
         if(entry!=null){
             int count=0;
@@ -120,11 +122,14 @@
     }
 
     public synchronized void removeMessage(MessageId msgId) throws IOException{
-        ReferenceRecord rr=messageContainer.remove(msgId);
-        if(rr!=null){
-            removeInterest(rr);
-            if(messageContainer.isEmpty()){
-                resetBatching();
+        StoreEntry entry=messageContainer.getEntry(msgId);
+        if(entry!=null){
+            ReferenceRecord rr=messageContainer.remove(msgId);
+            if(rr!=null){
+                removeInterest(rr);
+                if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+                    resetBatching();
+                }
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Tue Mar  6 02:25:48 2007
@@ -17,10 +17,8 @@
  */
 package org.apache.activemq.store.kahadaptor;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,7 +27,6 @@
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.ContainerId;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.MessageIdMarshaller;
@@ -39,7 +36,6 @@
 import org.apache.activemq.store.ReferenceStoreAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -51,10 +47,7 @@
 	private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
     private boolean storeValid;
 
-	public KahaReferenceStoreAdapter(File dir) throws IOException {
-		super(dir);
-	}
-
+	
     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
     	throw new RuntimeException("Use createQueueReferenceStore instead");
     }
@@ -164,10 +157,7 @@
         }		
 	}
     
-    public void sync() throws IOException {
-        getStore().force();
-    }
-    
+        
     protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
         Store store=getStore();
         MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Tue Mar  6 02:25:48 2007
@@ -119,8 +119,9 @@
         }
         // add the subscriber
         ListContainer container=addSubscriberMessageContainer(key);
+        /*
         if(retroactive){
-            for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
+            for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
                 ConsumerMessageRef ref=new ConsumerMessageRef();
                 ref.setAckEntry(entry);
@@ -128,6 +129,7 @@
                 container.add(ref);
             }
         }
+        */
     }
 
     public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Tue Mar  6 02:25:48 2007
@@ -17,7 +17,6 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -85,7 +84,8 @@
                 ConsumerMessageRef ref=new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
-                container.add(ref);
+                StoreEntry listEntry = container.add(ref);
+                
             }
         }
     }
@@ -118,8 +118,8 @@
 
     public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
             MessageId messageId) throws IOException{
-        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
         if(container!=null){
             ConsumerMessageRef ref=container.remove();
             if(container.isEmpty()){
@@ -140,6 +140,7 @@
                             removeInterest(rr);
                         }
                     }else{
+                       
                         ackContainer.update(ref.getAckEntry(),tsa);
                     }
                 }
@@ -163,13 +164,15 @@
         // add the subscriber
         ListContainer container=addSubscriberMessageContainer(key);
         if(retroactive){
-            for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
+            /*
+            for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
                 TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
                 ConsumerMessageRef ref=new ConsumerMessageRef();
                 ref.setAckEntry(entry);
                 ref.setMessageEntry(tsa.getMessageEntry());
                 container.add(ref);
             }
+            */
         }
     }
 
@@ -186,7 +189,7 @@
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriberName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        return container.size();
+        return  container != null ? container.size() : 0;
     }
 
     public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
@@ -226,6 +229,7 @@
 
     public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
             throws Exception{
+        
         String key=getSubscriptionKey(clientId,subscriptionName);
         TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
         if(container!=null){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Tue Mar  6 02:25:48 2007
@@ -54,14 +54,20 @@
         return listContainer.isEmpty();
     }
     
-    public void add(ConsumerMessageRef ref) {
-        listContainer.add(ref);
+    public StoreEntry add(ConsumerMessageRef ref) {
+        return listContainer.placeLast(ref);
     }
     
-    public ConsumerMessageRef remove() {
-        ConsumerMessageRef result =  (ConsumerMessageRef)listContainer.removeFirst();
-        if (listContainer.isEmpty()) {
-            reset();
+    public ConsumerMessageRef remove(){
+        ConsumerMessageRef result=null;
+        if(!listContainer.isEmpty()){
+            StoreEntry entry=listContainer.getFirst();
+            if(entry!=null){
+                result=(ConsumerMessageRef)listContainer.removeFirst();
+                if(listContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+                    reset();
+                }
+            }
         }
         return result;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?view=diff&rev=515054&r1=515053&r2=515054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Tue Mar  6 02:25:48 2007
@@ -158,4 +158,13 @@
     public String toString(){
         return "MemoryPersistenceAdapter";
     }
+
+    public void setBrokerName(String brokerName){        
+    }
+
+    public void setDirectory(File dir){        
+    }
+
+    public void checkpoint(boolean sync) throws IOException{        
+    }
 }