You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC

svn commit: r563982 [19/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Wed Aug  8 11:56:59 2007
@@ -60,62 +60,63 @@
 
     /** A MessageStore that we can use to retrieve messages quickly. */
     private LinkedHashMap cpAddedMessageIds;
-    
+
     protected RecordLocation lastLocation;
     protected HashSet inFlightTxLocations = new HashSet();
 
     private UsageManager usageManager;
-    
-    public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
+
+    public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore,
+                               ActiveMQDestination destination) {
         this.peristenceAdapter = adapter;
         this.transactionStore = adapter.getTransactionStore();
         this.longTermStore = checkpointStore;
         this.destination = destination;
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
     }
-    
+
     public void setUsageManager(UsageManager usageManager) {
         this.usageManager = usageManager;
         longTermStore.setUsageManager(usageManager);
     }
 
-
     /**
      * Not synchronized since the Journal has better throughput if you increase
      * the number of concurrent writes that it is doing.
      */
     public void addMessage(ConnectionContext context, final Message message) throws IOException {
-        
+
         final MessageId id = message.getMessageId();
-        
+
         final boolean debug = log.isDebugEnabled();
         message.incrementReferenceCount();
-        
+
         final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
-        if( !context.isInTransaction() ) {
-            if( debug )
-                log.debug("Journalled message add for: "+id+", at: "+location);
+        if (!context.isInTransaction()) {
+            if (debug)
+                log.debug("Journalled message add for: " + id + ", at: " + location);
             addMessage(message, location);
         } else {
-            if( debug )
-                log.debug("Journalled transacted message add for: "+id+", at: "+location);
-            synchronized( this ) {
+            if (debug)
+                log.debug("Journalled transacted message add for: " + id + ", at: " + location);
+            synchronized (this) {
                 inFlightTxLocations.add(location);
             }
             transactionStore.addMessage(this, message, location);
-            context.getTransaction().addSynchronization(new Synchronization(){
-                public void afterCommit() throws Exception {                    
-                    if( debug )
-                        log.debug("Transacted message add commit for: "+id+", at: "+location);
-                    synchronized( JournalMessageStore.this ) {
+            context.getTransaction().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    if (debug)
+                        log.debug("Transacted message add commit for: " + id + ", at: " + location);
+                    synchronized (JournalMessageStore.this) {
                         inFlightTxLocations.remove(location);
                         addMessage(message, location);
                     }
                 }
-                public void afterRollback() throws Exception {                    
-                    if( debug )
-                        log.debug("Transacted message add rollback for: "+id+", at: "+location);
-                    synchronized( JournalMessageStore.this ) {
+
+                public void afterRollback() throws Exception {
+                    if (debug)
+                        log.debug("Transacted message add rollback for: " + id + ", at: " + location);
+                    synchronized (JournalMessageStore.this) {
                         inFlightTxLocations.remove(location);
                     }
                     message.decrementReferenceCount();
@@ -131,17 +132,17 @@
             messages.put(id, message);
         }
     }
-    
+
     public void replayAddMessage(ConnectionContext context, Message message) {
         try {
             // Only add the message if it has not already been added.
             Message t = longTermStore.getMessage(message.getMessageId());
-            if( t==null ) {
+            if (t == null) {
                 longTermStore.addMessage(context, message);
             }
-        }
-        catch (Throwable e) {
-            log.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
+        } catch (Throwable e) {
+            log.warn("Could not replay add for message '" + message.getMessageId()
+                     + "'.  Message may have already been added. reason: " + e);
         }
     }
 
@@ -152,32 +153,36 @@
         JournalQueueAck remove = new JournalQueueAck();
         remove.setDestination(destination);
         remove.setMessageAck(ack);
-        
+
         final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
-        if( !context.isInTransaction() ) {
-            if( debug )
-                log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
+        if (!context.isInTransaction()) {
+            if (debug)
+                log.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
             removeMessage(ack, location);
         } else {
-            if( debug )
-                log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
-            synchronized( this ) {
+            if (debug)
+                log.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: "
+                          + location);
+            synchronized (this) {
                 inFlightTxLocations.add(location);
             }
             transactionStore.removeMessage(this, ack, location);
-            context.getTransaction().addSynchronization(new Synchronization(){
-                public void afterCommit() throws Exception {                    
-                    if( debug )
-                        log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
-                    synchronized( JournalMessageStore.this ) {
+            context.getTransaction().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    if (debug)
+                        log.debug("Transacted message remove commit for: " + ack.getLastMessageId()
+                                  + ", at: " + location);
+                    synchronized (JournalMessageStore.this) {
                         inFlightTxLocations.remove(location);
                         removeMessage(ack, location);
                     }
                 }
-                public void afterRollback() throws Exception {                    
-                    if( debug )
-                        log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
-                    synchronized( JournalMessageStore.this ) {
+
+                public void afterRollback() throws Exception {
+                    if (debug)
+                        log.debug("Transacted message remove rollback for: " + ack.getLastMessageId()
+                                  + ", at: " + location);
+                    synchronized (JournalMessageStore.this) {
                         inFlightTxLocations.remove(location);
                     }
                 }
@@ -185,12 +190,12 @@
 
         }
     }
-    
+
     final void removeMessage(final MessageAck ack, final RecordLocation location) {
         synchronized (this) {
             lastLocation = location;
             MessageId id = ack.getLastMessageId();
-            Message message = (Message) messages.remove(id);
+            Message message = (Message)messages.remove(id);
             if (message == null) {
                 messageAcks.add(ack);
             } else {
@@ -198,17 +203,17 @@
             }
         }
     }
-    
+
     public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
         try {
             // Only remove the message if it has not already been removed.
             Message t = longTermStore.getMessage(messageAck.getLastMessageId());
-            if( t!=null ) {
+            if (t != null) {
                 longTermStore.removeMessage(context, messageAck);
             }
-        }
-        catch (Throwable e) {
-            log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
+        } catch (Throwable e) {
+            log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
+                     + "'.  Message may have already been acknowledged. reason: " + e);
         }
     }
 
@@ -219,14 +224,13 @@
     public RecordLocation checkpoint() throws IOException {
         return checkpoint(null);
     }
-    
+
     /**
      * @return
      * @throws IOException
      */
     public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
 
-        
         RecordLocation rc;
         final ArrayList cpRemovedMessageLocations;
         final ArrayList cpActiveJournalLocations;
@@ -237,37 +241,37 @@
             cpAddedMessageIds = this.messages;
             cpRemovedMessageLocations = this.messageAcks;
 
-            cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
-            
+            cpActiveJournalLocations = new ArrayList(inFlightTxLocations);
+
             this.messages = new LinkedHashMap();
-            this.messageAcks = new ArrayList();            
+            this.messageAcks = new ArrayList();
         }
 
         transactionTemplate.run(new Callback() {
             public void execute() throws Exception {
 
                 int size = 0;
-                
+
                 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
                 ConnectionContext context = transactionTemplate.getContext();
-                
+
                 // Checkpoint the added messages.
-                synchronized(JournalMessageStore.this){
-                    Iterator iterator=cpAddedMessageIds.values().iterator();
-                    while(iterator.hasNext()){
-                        Message message=(Message)iterator.next();
-                        try{
-                            longTermStore.addMessage(context,message);
-                        }catch(Throwable e){
-                            log.warn("Message could not be added to long term store: "+e.getMessage(),e);
+                synchronized (JournalMessageStore.this) {
+                    Iterator iterator = cpAddedMessageIds.values().iterator();
+                    while (iterator.hasNext()) {
+                        Message message = (Message)iterator.next();
+                        try {
+                            longTermStore.addMessage(context, message);
+                        } catch (Throwable e) {
+                            log.warn("Message could not be added to long term store: " + e.getMessage(), e);
                         }
-                        size+=message.getSize();
+                        size += message.getSize();
                         message.decrementReferenceCount();
                         // Commit the batch if it's getting too big
-                        if(size>=maxCheckpointMessageAddSize){
+                        if (size >= maxCheckpointMessageAddSize) {
                             persitanceAdapter.commitTransaction(context);
                             persitanceAdapter.beginTransaction(context);
-                            size=0;
+                            size = 0;
                         }
                     }
                 }
@@ -279,14 +283,14 @@
                 Iterator iterator = cpRemovedMessageLocations.iterator();
                 while (iterator.hasNext()) {
                     try {
-                        MessageAck ack = (MessageAck) iterator.next();
+                        MessageAck ack = (MessageAck)iterator.next();
                         longTermStore.removeMessage(transactionTemplate.getContext(), ack);
                     } catch (Throwable e) {
                         log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
                     }
                 }
-                
-                if( postCheckpointTest!= null ) {
+
+                if (postCheckpointTest != null) {
                     postCheckpointTest.execute();
                 }
             }
@@ -296,12 +300,12 @@
         synchronized (this) {
             cpAddedMessageIds = null;
         }
-        
-        if( cpActiveJournalLocations.size() > 0 ) {
+
+        if (cpActiveJournalLocations.size() > 0) {
             Collections.sort(cpActiveJournalLocations);
-            return (RecordLocation) cpActiveJournalLocations.get(0);
+            return (RecordLocation)cpActiveJournalLocations.get(0);
         }
-        synchronized (this){
+        synchronized (this) {
             return lastLocation;
         }
     }
@@ -314,15 +318,15 @@
 
         synchronized (this) {
             // Do we have a still have it in the journal?
-            answer = (Message) messages.get(identity);
-            if( answer==null && cpAddedMessageIds!=null )
-                answer = (Message) cpAddedMessageIds.get(identity);
+            answer = (Message)messages.get(identity);
+            if (answer == null && cpAddedMessageIds != null)
+                answer = (Message)cpAddedMessageIds.get(identity);
         }
-        
-        if (answer != null ) {
+
+        if (answer != null) {
             return answer;
         }
-        
+
         // If all else fails try the long term message store.
         return longTermStore.getMessage(identity);
     }
@@ -333,7 +337,7 @@
      * updated.
      * 
      * @param listener
-     * @throws Exception 
+     * @throws Exception
      */
     public void recover(final MessageRecoveryListener listener) throws Exception {
         peristenceAdapter.checkpoint(true, true);
@@ -341,14 +345,14 @@
     }
 
     public void start() throws Exception {
-        if( this.usageManager != null )
+        if (this.usageManager != null)
             this.usageManager.addUsageListener(peristenceAdapter);
         longTermStore.start();
     }
 
     public void stop() throws Exception {
         longTermStore.stop();
-        if( this.usageManager != null )
+        if (this.usageManager != null)
             this.usageManager.removeUsageListener(peristenceAdapter);
     }
 
@@ -366,12 +370,13 @@
         peristenceAdapter.checkpoint(true, true);
         longTermStore.removeAllMessages(context);
     }
-    
+
     public ActiveMQDestination getDestination() {
         return destination;
     }
 
-    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
+                                    String messageRef) throws IOException {
         throw new IOException("The journal does not support message references.");
     }
 
@@ -381,25 +386,23 @@
 
     /**
      * @return
-     * @throws IOException 
+     * @throws IOException
      * @see org.apache.activemq.store.MessageStore#getMessageCount()
      */
-    public int getMessageCount() throws IOException{
+    public int getMessageCount() throws IOException {
         peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount();
     }
 
-   
-    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+    public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
         peristenceAdapter.checkpoint(true, true);
-        longTermStore.recoverNextMessages(maxReturned,listener);
-        
+        longTermStore.recoverNextMessages(maxReturned, listener);
+
     }
 
-    
-    public void resetBatching(){
+    public void resetBatching() {
         longTermStore.resetBatching();
-        
+
     }
 
-}
\ No newline at end of file
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Aug  8 11:56:59 2007
@@ -75,7 +75,6 @@
  * other long term persistent storage.
  * 
  * @org.apache.xbean.XBean
- * 
  * @version $Revision: 1.17 $
  */
 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
@@ -89,45 +88,45 @@
 
     private final ConcurrentHashMap queues = new ConcurrentHashMap();
     private final ConcurrentHashMap topics = new ConcurrentHashMap();
-    
+
     private UsageManager usageManager;
     long checkpointInterval = 1000 * 60 * 5;
     long lastCheckpointRequest = System.currentTimeMillis();
     private long lastCleanup = System.currentTimeMillis();
     private int maxCheckpointWorkers = 10;
-    private int maxCheckpointMessageAddSize = 1024*1024;
+    private int maxCheckpointMessageAddSize = 1024 * 1024;
 
     private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
     private ThreadPoolExecutor checkpointExecutor;
-    
+
     private TaskRunner checkpointTask;
     private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
     private boolean fullCheckPoint;
-    
+
     private AtomicBoolean started = new AtomicBoolean(false);
 
-    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); 
-    	
+    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
+
     final Runnable createPeriodicCheckpointTask() {
-    	return new Runnable() {
-	        public void run() {
+        return new Runnable() {
+            public void run() {
                 long lastTime = 0;
-                synchronized(this) {
+                synchronized (this) {
                     lastTime = lastCheckpointRequest;
                 }
-	            if( System.currentTimeMillis()>lastTime+checkpointInterval ) {
-	                checkpoint(false, true);
-	            }
-	        }
-	    };
+                if (System.currentTimeMillis() > lastTime + checkpointInterval) {
+                    checkpoint(false, true);
+                }
+            }
+        };
     }
-    
+
     public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
 
         this.journal = journal;
         journal.setJournalEventListener(this);
-        
-        checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
+
+        checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
             public boolean iterate() {
                 return doCheckpoint();
             }
@@ -137,7 +136,8 @@
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     * @param usageManager The UsageManager that is controlling the
+     *                destination's memory usage.
      */
     public void setUsageManager(UsageManager usageManager) {
         this.usageManager = usageManager;
@@ -153,15 +153,14 @@
 
     private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
         if (destination.isQueue()) {
-            return createQueueMessageStore((ActiveMQQueue) destination);
-        }
-        else {
-            return createTopicMessageStore((ActiveMQTopic) destination);
+            return createQueueMessageStore((ActiveMQQueue)destination);
+        } else {
+            return createTopicMessageStore((ActiveMQTopic)destination);
         }
     }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        JournalMessageStore store = (JournalMessageStore) queues.get(destination);
+        JournalMessageStore store = (JournalMessageStore)queues.get(destination);
         if (store == null) {
             MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
             store = new JournalMessageStore(this, checkpointStore, destination);
@@ -171,7 +170,7 @@
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
-        JournalTopicMessageStore store = (JournalTopicMessageStore) topics.get(destinationName);
+        JournalTopicMessageStore store = (JournalTopicMessageStore)topics.get(destinationName);
         if (store == null) {
             TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
             store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
@@ -201,24 +200,25 @@
     }
 
     public synchronized void start() throws Exception {
-        if( !started.compareAndSet(false, true) )
+        if (!started.compareAndSet(false, true)) {
             return;
-        
+        }
+
         checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
             public Thread newThread(Runnable runable) {
                 Thread t = new Thread(runable, "Journal checkpoint worker");
                 t.setPriority(7);
                 return t;
-            }            
+            }
         });
-        //checkpointExecutor.allowCoreThreadTimeOut(true);
-        
+        // checkpointExecutor.allowCoreThreadTimeOut(true);
+
         this.usageManager.addUsageListener(this);
 
         if (longTermPersistence instanceof JDBCPersistenceAdapter) {
             // Disabled periodic clean up as it deadlocks with the checkpoint
             // operations.
-            ((JDBCPersistenceAdapter) longTermPersistence).setCleanupPeriod(0);
+            ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
         }
 
         longTermPersistence.start();
@@ -226,23 +226,23 @@
         recover();
 
         // Do a checkpoint periodically.
-        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10);
+        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
 
     }
 
     public void stop() throws Exception {
-        
+
         this.usageManager.removeUsageListener(this);
-        if( !started.compareAndSet(true, false) )
+        if (!started.compareAndSet(true, false))
             return;
-        
+
         Scheduler.cancel(periodicCheckpointTask);
 
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true, true);
-        checkpointTask.shutdown();        
+        checkpointTask.shutdown();
         checkpointExecutor.shutdown();
-        
+
         queues.clear();
         topics.clear();
 
@@ -253,7 +253,7 @@
             firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
         }
         longTermPersistence.stop();
-        
+
         if (firstException != null) {
             throw firstException;
         }
@@ -287,83 +287,86 @@
 
     /**
      * When we checkpoint we move all the journalled data to long term storage.
-     * @param stopping 
      * 
+     * @param stopping
      * @param b
      */
     public void checkpoint(boolean sync, boolean fullCheckpoint) {
         try {
-            if (journal == null )
+            if (journal == null)
                 throw new IllegalStateException("Journal is closed.");
-            
+
             long now = System.currentTimeMillis();
             CountDownLatch latch = null;
-            synchronized(this) {
+            synchronized (this) {
                 latch = nextCheckpointCountDownLatch;
                 lastCheckpointRequest = now;
-                if( fullCheckpoint ) {
-                    this.fullCheckPoint = true; 
+                if (fullCheckpoint) {
+                    this.fullCheckPoint = true;
                 }
             }
-            
+
             checkpointTask.wakeup();
-            
+
             if (sync) {
                 log.debug("Waking for checkpoint to complete.");
                 latch.await();
             }
-        }
-        catch (InterruptedException e) {
+        } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.warn("Request to start checkpoint failed: " + e, e);
         }
     }
-    
+
     public void checkpoint(boolean sync) {
-        checkpoint(sync,sync);
+        checkpoint(sync, sync);
     }
-        
+
     /**
      * This does the actual checkpoint.
-     * @return 
+     * 
+     * @return
      */
     public boolean doCheckpoint() {
         CountDownLatch latch = null;
         boolean fullCheckpoint;
-        synchronized(this) {                       
+        synchronized (this) {
             latch = nextCheckpointCountDownLatch;
             nextCheckpointCountDownLatch = new CountDownLatch(1);
             fullCheckpoint = this.fullCheckPoint;
-            this.fullCheckPoint=false;            
-        }        
+            this.fullCheckPoint = false;
+        }
         try {
 
             log.debug("Checkpoint started.");
             RecordLocation newMark = null;
 
-            ArrayList futureTasks = new ArrayList(queues.size()+topics.size());
-            
+            ArrayList futureTasks = new ArrayList(queues.size() + topics.size());
+
             //
-            // We do many partial checkpoints (fullCheckpoint==false) to move topic messages
-            // to long term store as soon as possible.  
+            // We do many partial checkpoints (fullCheckpoint==false) to move
+            // topic messages
+            // to long term store as soon as possible.
             // 
-            // We want to avoid doing that for queue messages since removes the come in the same
-            // checkpoint cycle will nullify the previous message add.  Therefore, we only
+            // We want to avoid doing that for queue messages since removes the
+            // come in the same
+            // checkpoint cycle will nullify the previous message add.
+            // Therefore, we only
             // checkpoint queues on the fullCheckpoint cycles.
             //
-            if( fullCheckpoint ) {                
+            if (fullCheckpoint) {
                 Iterator iterator = queues.values().iterator();
                 while (iterator.hasNext()) {
                     try {
-                        final JournalMessageStore ms = (JournalMessageStore) iterator.next();
+                        final JournalMessageStore ms = (JournalMessageStore)iterator.next();
                         FutureTask task = new FutureTask(new Callable() {
                             public Object call() throws Exception {
                                 return ms.checkpoint();
-                            }});
+                            }
+                        });
                         futureTasks.add(task);
-                        checkpointExecutor.execute(task);                        
-                    }
-                    catch (Exception e) {
+                        checkpointExecutor.execute(task);
+                    } catch (Exception e) {
                         log.error("Failed to checkpoint a message store: " + e, e);
                     }
                 }
@@ -372,25 +375,25 @@
             Iterator iterator = topics.values().iterator();
             while (iterator.hasNext()) {
                 try {
-                    final JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next();
+                    final JournalTopicMessageStore ms = (JournalTopicMessageStore)iterator.next();
                     FutureTask task = new FutureTask(new Callable() {
                         public Object call() throws Exception {
                             return ms.checkpoint();
-                        }});
+                        }
+                    });
                     futureTasks.add(task);
-                    checkpointExecutor.execute(task);                        
-                }
-                catch (Exception e) {
+                    checkpointExecutor.execute(task);
+                } catch (Exception e) {
                     log.error("Failed to checkpoint a message store: " + e, e);
                 }
             }
 
             try {
                 for (Iterator iter = futureTasks.iterator(); iter.hasNext();) {
-                    FutureTask ft = (FutureTask) iter.next();
-                    RecordLocation mark = (RecordLocation) ft.get();
+                    FutureTask ft = (FutureTask)iter.next();
+                    RecordLocation mark = (RecordLocation)ft.get();
                     // We only set a newMark on full checkpoints.
-                    if( fullCheckpoint ) {
+                    if (fullCheckpoint) {
                         if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
                             newMark = mark;
                         }
@@ -399,38 +402,36 @@
             } catch (Throwable e) {
                 log.error("Failed to checkpoint a message store: " + e, e);
             }
-            
 
-            if( fullCheckpoint ) {
+            if (fullCheckpoint) {
                 try {
                     if (newMark != null) {
                         log.debug("Marking journal at: " + newMark);
                         journal.setMark(newMark, true);
                     }
-                }
-                catch (Exception e) {
+                } catch (Exception e) {
                     log.error("Failed to mark the Journal: " + e, e);
                 }
-    
+
                 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
-                    // We may be check pointing more often than the checkpointInterval if under high use
+                    // We may be check pointing more often than the
+                    // checkpointInterval if under high use
                     // But we don't want to clean up the db that often.
                     long now = System.currentTimeMillis();
-                    if( now > lastCleanup+checkpointInterval ) {
+                    if (now > lastCleanup + checkpointInterval) {
                         lastCleanup = now;
-                        ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
+                        ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
                     }
                 }
             }
 
             log.debug("Checkpoint done.");
-        }
-        finally {
+        } finally {
             latch.countDown();
         }
-        synchronized(this) {
+        synchronized (this) {
             return this.fullCheckPoint;
-        }        
+        }
 
     }
 
@@ -441,13 +442,11 @@
      */
     public DataStructure readCommand(RecordLocation location) throws IOException {
         try {
-        	Packet packet = journal.read(location);
-            return (DataStructure) wireFormat.unmarshal(toByteSequence(packet));
-        }
-        catch (InvalidRecordLocationException e) {
+            Packet packet = journal.read(location);
+            return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
+        } catch (InvalidRecordLocationException e) {
             throw createReadException(location, e);
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             throw createReadException(location, e);
         }
     }
@@ -472,49 +471,43 @@
         // While we have records in the journal.
         while ((pos = journal.getNextRecordLocation(pos)) != null) {
             Packet data = journal.read(pos);
-            DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
+            DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
 
-            if (c instanceof Message ) {
-                Message message = (Message) c;
-                JournalMessageStore store = (JournalMessageStore) createMessageStore(message.getDestination());
-                if ( message.isInTransaction()) {
+            if (c instanceof Message) {
+                Message message = (Message)c;
+                JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
+                if (message.isInTransaction()) {
                     transactionStore.addMessage(store, message, pos);
-                }
-                else {
+                } else {
                     store.replayAddMessage(context, message);
                     transactionCounter++;
                 }
             } else {
                 switch (c.getDataStructureType()) {
-                case JournalQueueAck.DATA_STRUCTURE_TYPE:
-                {
-                    JournalQueueAck command = (JournalQueueAck) c;
-                    JournalMessageStore store = (JournalMessageStore) createMessageStore(command.getDestination());
+                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
+                    JournalQueueAck command = (JournalQueueAck)c;
+                    JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
                     if (command.getMessageAck().isInTransaction()) {
                         transactionStore.removeMessage(store, command.getMessageAck(), pos);
-                    }
-                    else {
+                    } else {
                         store.replayRemoveMessage(context, command.getMessageAck());
                         transactionCounter++;
                     }
                 }
-                break;
-                case JournalTopicAck.DATA_STRUCTURE_TYPE: 
-                {
-                    JournalTopicAck command = (JournalTopicAck) c;
-                    JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(command.getDestination());
+                    break;
+                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
+                    JournalTopicAck command = (JournalTopicAck)c;
+                    JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
                     if (command.getTransactionId() != null) {
                         transactionStore.acknowledge(store, command, pos);
-                    }
-                    else {
+                    } else {
                         store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
                         transactionCounter++;
                     }
                 }
-                break;
-                case JournalTransaction.DATA_STRUCTURE_TYPE:
-                {
-                    JournalTransaction command = (JournalTransaction) c;
+                    break;
+                case JournalTransaction.DATA_STRUCTURE_TYPE: {
+                    JournalTransaction command = (JournalTransaction)c;
                     try {
                         // Try to replay the packet.
                         switch (command.getType()) {
@@ -525,23 +518,23 @@
                         case JournalTransaction.LOCAL_COMMIT:
                             Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
                             if (tx == null)
-                                break; // We may be trying to replay a commit that
-                                        // was already committed.
+                                break; // We may be trying to replay a commit
+                            // that
+                            // was already committed.
 
                             // Replay the committed operations.
                             tx.getOperations();
                             for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
-                                TxOperation op = (TxOperation) iter.next();
+                                TxOperation op = (TxOperation)iter.next();
                                 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
-                                    op.store.replayAddMessage(context, (Message) op.data);
+                                    op.store.replayAddMessage(context, (Message)op.data);
                                 }
                                 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
-                                    op.store.replayRemoveMessage(context, (MessageAck) op.data);
+                                    op.store.replayRemoveMessage(context, (MessageAck)op.data);
                                 }
                                 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
-                                    JournalTopicAck ack = (JournalTopicAck) op.data;
-                                    ((JournalTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
-                                            .getMessageId());
+                                    JournalTopicAck ack = (JournalTopicAck)op.data;
+                                    ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
                                 }
                             }
                             transactionCounter++;
@@ -551,14 +544,13 @@
                             transactionStore.replayRollback(command.getTransactionId());
                             break;
                         }
-                    }
-                    catch (IOException e) {
+                    } catch (IOException e) {
                         log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
                     }
                 }
-                break;
+                    break;
                 case JournalTrace.DATA_STRUCTURE_TYPE:
-                    JournalTrace trace = (JournalTrace) c;
+                    JournalTrace trace = (JournalTrace)c;
                     log.debug("TRACE Entry: " + trace.getMessage());
                     break;
                 default:
@@ -590,14 +582,13 @@
     }
 
     /**
-     * 
      * @param command
      * @param sync
      * @return
      * @throws IOException
      */
     public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
-        if( started.get() )
+        if (started.get())
             return journal.write(toPacket(wireFormat.marshal(command)), sync);
         throw new IOException("closed");
     }
@@ -609,19 +600,19 @@
     }
 
     public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
-        newPercentUsage = ((newPercentUsage)/10)*10;
-        oldPercentUsage = ((oldPercentUsage)/10)*10;
+        newPercentUsage = ((newPercentUsage) / 10) * 10;
+        oldPercentUsage = ((oldPercentUsage) / 10) * 10;
         if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
             boolean sync = newPercentUsage >= 90;
             checkpoint(sync, true);
         }
     }
-    
+
     public JournalTransactionStore getTransactionStore() {
         return transactionStore;
     }
 
-    public void deleteAllMessages() throws IOException {        
+    public void deleteAllMessages() throws IOException {
         try {
             JournalTrace trace = new JournalTrace();
             trace.setMessage("DELETED");
@@ -661,28 +652,28 @@
     }
 
     public void setUseExternalMessageReferences(boolean enable) {
-        if( enable )
+        if (enable)
             throw new IllegalArgumentException("The journal does not support message references.");
     }
-    
+
     public Packet toPacket(ByteSequence sequence) {
-    	return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
+        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
     }
-    
+
     public ByteSequence toByteSequence(Packet packet) {
-    	org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
-    	return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
+        org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
+        return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
     }
-    
-    public void setBrokerName(String brokerName){
+
+    public void setBrokerName(String brokerName) {
         longTermPersistence.setBrokerName(brokerName);
     }
-    
-    public String toString(){
+
+    public String toString() {
         return "JournalPersistenceAdapator(" + longTermPersistence + ")";
     }
 
-    public void setDirectory(File dir){        
+    public void setDirectory(File dir) {
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Wed Aug  8 11:56:59 2007
@@ -34,34 +34,34 @@
 
 /**
  * Factory class that can create PersistenceAdapter objects.
- *
+ * 
  * @version $Revision: 1.4 $
  */
 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
-    
-    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10*1000;
+
+    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
 
     private static final Log log = LogFactory.getLog(JournalPersistenceAdapterFactory.class);
-    
-    private int journalLogFileSize = 1024*1024*20;
+
+    private int journalLogFileSize = 1024 * 1024 * 20;
     private int journalLogFiles = 2;
     private TaskRunnerFactory taskRunnerFactory;
     private Journal journal;
-    private boolean useJournal=true;
-    private boolean useQuickJournal=false;
+    private boolean useJournal = true;
+    private boolean useQuickJournal = false;
     private File journalArchiveDirectory;
-    private boolean failIfJournalIsLocked=false;
+    private boolean failIfJournalIsLocked = false;
     private int journalThreadPriority = Thread.MAX_PRIORITY;
     private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
-    
+
     public PersistenceAdapter createPersistenceAdapter() throws IOException {
         jdbcPersistenceAdapter.setDataSource(getDataSource());
-        
-        if( !useJournal ) {
+
+        if (!useJournal) {
             return jdbcPersistenceAdapter;
         }
         return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
-        
+
     }
 
     public int getJournalLogFiles() {
@@ -81,13 +81,13 @@
 
     /**
      * Sets the size of the journal log files
-     *
+     * 
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
      */
     public void setJournalLogFileSize(int journalLogFileSize) {
         this.journalLogFileSize = journalLogFileSize;
     }
-    
+
     public JDBCPersistenceAdapter getJdbcAdapter() {
         return jdbcPersistenceAdapter;
     }
@@ -101,8 +101,9 @@
     }
 
     /**
-     * Enables or disables the use of the journal. The default is to use the journal
-     *
+     * Enables or disables the use of the journal. The default is to use the
+     * journal
+     * 
      * @param useJournal
      */
     public void setUseJournal(boolean useJournal) {
@@ -110,8 +111,9 @@
     }
 
     public TaskRunnerFactory getTaskRunnerFactory() {
-        if( taskRunnerFactory == null ) {
-            taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, true, 1000);
+        if (taskRunnerFactory == null) {
+            taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
+                                                      true, 1000);
         }
         return taskRunnerFactory;
     }
@@ -121,7 +123,7 @@
     }
 
     public Journal getJournal() throws IOException {
-        if( journal == null ) {
+        if (journal == null) {
             createJournal();
         }
         return journal;
@@ -132,7 +134,7 @@
     }
 
     public File getJournalArchiveDirectory() {
-        if( journalArchiveDirectory == null && useQuickJournal ) {
+        if (journalArchiveDirectory == null && useQuickJournal) {
             journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
         }
         return journalArchiveDirectory;
@@ -142,15 +144,14 @@
         this.journalArchiveDirectory = journalArchiveDirectory;
     }
 
-
     public boolean isUseQuickJournal() {
         return useQuickJournal;
     }
 
     /**
-     * Enables or disables the use of quick journal, which keeps messages in the journal and just
-     * stores a reference to the messages in JDBC. Defaults to false so that messages actually reside
-     * long term in the JDBC database.
+     * Enables or disables the use of quick journal, which keeps messages in the
+     * journal and just stores a reference to the messages in JDBC. Defaults to
+     * false so that messages actually reside long term in the JDBC database.
      */
     public void setUseQuickJournal(boolean useQuickJournal) {
         this.useQuickJournal = useQuickJournal;
@@ -167,6 +168,7 @@
     public Statements getStatements() {
         return jdbcPersistenceAdapter.getStatements();
     }
+
     public void setStatements(Statements statements) {
         jdbcPersistenceAdapter.setStatements(statements);
     }
@@ -176,7 +178,8 @@
     }
 
     /**
-     * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
+     * Sets whether or not an exclusive database lock should be used to enable
+     * JDBC Master/Slave. Enabled by default.
      */
     public void setUseDatabaseLock(boolean useDatabaseLock) {
         jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
@@ -192,16 +195,16 @@
     public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
         jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
     }
-    
-    public int getJournalThreadPriority(){
+
+    public int getJournalThreadPriority() {
         return journalThreadPriority;
     }
 
     /**
      * Sets the thread priority of the journal thread
      */
-    public void setJournalThreadPriority(int journalThreadPriority){
-        this.journalThreadPriority=journalThreadPriority;
+    public void setJournalThreadPriority(int journalThreadPriority) {
+        this.journalThreadPriority = journalThreadPriority;
     }
 
     /**
@@ -209,15 +212,18 @@
      */
     protected void createJournal() throws IOException {
         File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
-        if( failIfJournalIsLocked ) {
-            journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+        if (failIfJournalIsLocked) {
+            journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
+                                      getJournalArchiveDirectory());
         } else {
-            while( true ) {
+            while (true) {
                 try {
-                    journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, getJournalArchiveDirectory());
+                    journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
+                                              getJournalArchiveDirectory());
                     break;
                 } catch (JournalLockedException e) {
-                    log.info("Journal is locked... waiting "+(JOURNAL_LOCKED_WAIT_DELAY/1000)+" seconds for the journal to be unlocked.");
+                    log.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
+                             + " seconds for the journal to be unlocked.");
                     try {
                         Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
                     } catch (InterruptedException e1) {
@@ -226,7 +232,5 @@
             }
         }
     }
-
-    
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Wed Aug  8 11:56:59 2007
@@ -41,26 +41,29 @@
  * @version $Revision: 1.13 $
  */
 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
-    
+
     private static final Log log = LogFactory.getLog(JournalTopicMessageStore.class);
 
     private TopicMessageStore longTermStore;
-	private HashMap ackedLastAckLocations = new HashMap();
-    
-    public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, ActiveMQTopic destinationName) {
+    private HashMap ackedLastAckLocations = new HashMap();
+
+    public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
+                                    ActiveMQTopic destinationName) {
         super(adapter, checkpointStore, destinationName);
         this.longTermStore = checkpointStore;
     }
-    
-    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+
+    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
+        throws Exception {
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.recoverSubscription(clientId, subscriptionName, listener);
     }
-    
-    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
+
+    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
+                                    MessageRecoveryListener listener) throws Exception {
         this.peristenceAdapter.checkpoint(true, true);
-        longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
-        
+        longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
+
     }
 
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
@@ -75,66 +78,69 @@
     public void addMessage(ConnectionContext context, Message message) throws IOException {
         super.addMessage(context, message);
     }
-    
+
     /**
      */
-    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                            final MessageId messageId) throws IOException {
         final boolean debug = log.isDebugEnabled();
-        
+
         JournalTopicAck ack = new JournalTopicAck();
         ack.setDestination(destination);
         ack.setMessageId(messageId);
         ack.setMessageSequenceId(messageId.getBrokerSequenceId());
         ack.setSubscritionName(subscriptionName);
         ack.setClientId(clientId);
-        ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
+        ack.setTransactionId(context.getTransaction() != null
+            ? context.getTransaction().getTransactionId() : null);
         final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
-        
-        final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);        
-        if( !context.isInTransaction() ) {
-            if( debug )
-                log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
+
+        final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        if (!context.isInTransaction()) {
+            if (debug)
+                log.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
             acknowledge(messageId, location, key);
         } else {
-            if( debug )
-                log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
+            if (debug)
+                log.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
             synchronized (this) {
                 inFlightTxLocations.add(location);
             }
             transactionStore.acknowledge(this, ack, location);
-            context.getTransaction().addSynchronization(new Synchronization(){
-                public void afterCommit() throws Exception {                    
-                    if( debug )
-                        log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
+            context.getTransaction().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    if (debug)
+                        log.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
                     synchronized (JournalTopicMessageStore.this) {
                         inFlightTxLocations.remove(location);
                         acknowledge(messageId, location, key);
                     }
                 }
-                public void afterRollback() throws Exception {                    
-                    if( debug )
-                        log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
+
+                public void afterRollback() throws Exception {
+                    if (debug)
+                        log.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
                     synchronized (JournalTopicMessageStore.this) {
                         inFlightTxLocations.remove(location);
                     }
                 }
             });
         }
-        
+
     }
-    
-    public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+
+    public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName,
+                                  MessageId messageId) {
         try {
             SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
-            if( sub != null ) {
+            if (sub != null) {
                 longTermStore.acknowledge(context, clientId, subscritionName, messageId);
             }
-        }
-        catch (Throwable e) {
-            log.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
+        } catch (Throwable e) {
+            log.debug("Could not replay acknowledge for message '" + messageId
+                      + "'.  Message may have already been acknowledged. reason: " + e);
         }
     }
-        
 
     /**
      * @param messageId
@@ -142,15 +148,15 @@
      * @param key
      */
     protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
-        synchronized(this) {
-		    lastLocation = location;
-		    ackedLastAckLocations.put(key, messageId);
-		}
+        synchronized (this) {
+            lastLocation = location;
+            ackedLastAckLocations.put(key, messageId);
+        }
     }
-    
+
     public RecordLocation checkpoint() throws IOException {
-        
-		final HashMap cpAckedLastAckLocations;
+
+        final HashMap cpAckedLastAckLocations;
 
         // swap out the hash maps..
         synchronized (this) {
@@ -158,15 +164,16 @@
             this.ackedLastAckLocations = new HashMap();
         }
 
-        return super.checkpoint( new Callback() {
+        return super.checkpoint(new Callback() {
             public void execute() throws Exception {
 
                 // Checkpoint the acknowledged messages.
                 Iterator iterator = cpAckedLastAckLocations.keySet().iterator();
                 while (iterator.hasNext()) {
-                    SubscriptionKey subscriptionKey = (SubscriptionKey) iterator.next();
-                    MessageId identity = (MessageId) cpAckedLastAckLocations.get(subscriptionKey);
-                    longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
+                    SubscriptionKey subscriptionKey = (SubscriptionKey)iterator.next();
+                    MessageId identity = (MessageId)cpAckedLastAckLocations.get(subscriptionKey);
+                    longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
+                                              subscriptionKey.subscriptionName, identity);
                 }
 
             }
@@ -175,30 +182,27 @@
     }
 
     /**
-	 * @return Returns the longTermStore.
-	 */
-	public TopicMessageStore getLongTermTopicMessageStore() {
-		return longTermStore;
-	}
+     * @return Returns the longTermStore.
+     */
+    public TopicMessageStore getLongTermTopicMessageStore() {
+        return longTermStore;
+    }
 
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
         longTermStore.deleteSubscription(clientId, subscriptionName);
     }
-    
+
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return longTermStore.getAllSubscriptions();
     }
 
-    
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+    public int getMessageCount(String clientId, String subscriberName) throws IOException {
         this.peristenceAdapter.checkpoint(true, true);
-        return longTermStore.getMessageCount(clientId,subscriberName);
-    }
-    
-    public void resetBatching(String clientId,String subscriptionName) {
-        longTermStore.resetBatching(clientId,subscriptionName);
+        return longTermStore.getMessageCount(clientId, subscriberName);
     }
 
-    
+    public void resetBatching(String clientId, String subscriptionName) {
+        longTermStore.resetBatching(clientId, subscriptionName);
+    }
 
-}
\ No newline at end of file
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Wed Aug  8 11:56:59 2007
@@ -36,7 +36,6 @@
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 
-
 /**
  */
 public class JournalTransactionStore implements TransactionStore {
@@ -46,26 +45,27 @@
     Map preparedTransactions = new LinkedHashMap();
     private boolean doingRecover;
 
-    
     public static class TxOperation {
-        
-        static final byte ADD_OPERATION_TYPE       = 0;
-        static final byte REMOVE_OPERATION_TYPE    = 1;
-        static final byte ACK_OPERATION_TYPE       = 3;
-        
+
+        static final byte ADD_OPERATION_TYPE = 0;
+        static final byte REMOVE_OPERATION_TYPE = 1;
+        static final byte ACK_OPERATION_TYPE = 3;
+
         public byte operationType;
         public JournalMessageStore store;
         public Object data;
-        
+
         public TxOperation(byte operationType, JournalMessageStore store, Object data) {
-            this.operationType=operationType;
-            this.store=store;
-            this.data=data;
+            this.operationType = operationType;
+            this.store = store;
+            this.data = data;
         }
-        
+
     }
+
     /**
      * Operations
+     * 
      * @version $Revision: 1.6 $
      */
     public static class Tx {
@@ -74,7 +74,7 @@
         private ArrayList operations = new ArrayList();
 
         public Tx(RecordLocation location) {
-            this.location=location;
+            this.location = location;
         }
 
         public void add(JournalMessageStore store, Message msg) {
@@ -88,12 +88,12 @@
         public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
             operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
         }
-        
+
         public Message[] getMessages() {
             ArrayList list = new ArrayList();
             for (Iterator iter = operations.iterator(); iter.hasNext();) {
-                TxOperation op = (TxOperation) iter.next();
-                if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
+                TxOperation op = (TxOperation)iter.next();
+                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
                     list.add(op.data);
                 }
             }
@@ -105,8 +105,8 @@
         public MessageAck[] getAcks() {
             ArrayList list = new ArrayList();
             for (Iterator iter = operations.iterator(); iter.hasNext();) {
-                TxOperation op = (TxOperation) iter.next();
-                if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
+                TxOperation op = (TxOperation)iter.next();
+                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
                     list.add(op.data);
                 }
             }
@@ -129,43 +129,44 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void prepare(TransactionId txid) throws IOException{
-        Tx tx=null;
-        synchronized(inflightTransactions){
-            tx=(Tx)inflightTransactions.remove(txid);
+    public void prepare(TransactionId txid) throws IOException {
+        Tx tx = null;
+        synchronized (inflightTransactions) {
+            tx = (Tx)inflightTransactions.remove(txid);
         }
-        if(tx==null)
+        if (tx == null)
             return;
-        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
-        synchronized(preparedTransactions){
-            preparedTransactions.put(txid,tx);
+        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
+                                       true);
+        synchronized (preparedTransactions) {
+            preparedTransactions.put(txid, tx);
         }
     }
-    
+
     /**
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void replayPrepare(TransactionId txid) throws IOException{
-        Tx tx=null;
-        synchronized(inflightTransactions){
-            tx=(Tx)inflightTransactions.remove(txid);
+    public void replayPrepare(TransactionId txid) throws IOException {
+        Tx tx = null;
+        synchronized (inflightTransactions) {
+            tx = (Tx)inflightTransactions.remove(txid);
         }
-        if(tx==null)
+        if (tx == null)
             return;
-        synchronized(preparedTransactions){
-            preparedTransactions.put(txid,tx);
+        synchronized (preparedTransactions) {
+            preparedTransactions.put(txid, tx);
         }
     }
 
-    public Tx getTx(Object txid,RecordLocation location){
-        Tx tx=null;
-        synchronized(inflightTransactions){
-            tx=(Tx)inflightTransactions.get(txid);
+    public Tx getTx(Object txid, RecordLocation location) {
+        Tx tx = null;
+        synchronized (inflightTransactions) {
+            tx = (Tx)inflightTransactions.get(txid);
         }
-        if(tx==null){
-            tx=new Tx(location);
-            inflightTransactions.put(txid,tx);
+        if (tx == null) {
+            tx = new Tx(location);
+            inflightTransactions.put(txid, tx);
         }
         return tx;
     }
@@ -174,24 +175,25 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
+    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
         Tx tx;
-        if(wasPrepared){
-            synchronized(preparedTransactions){
-                tx=(Tx)preparedTransactions.remove(txid);
+        if (wasPrepared) {
+            synchronized (preparedTransactions) {
+                tx = (Tx)preparedTransactions.remove(txid);
             }
-        }else{
-            synchronized(inflightTransactions){
-                tx=(Tx)inflightTransactions.remove(txid);
+        } else {
+            synchronized (inflightTransactions) {
+                tx = (Tx)inflightTransactions.remove(txid);
             }
         }
-        if(tx==null)
+        if (tx == null)
             return;
-        if(txid.isXATransaction()){
-            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
-        }else{
-            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
-                    true);
+        if (txid.isXATransaction()) {
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
+                                                                  wasPrepared), true);
+        } else {
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
+                                                                  wasPrepared), true);
         }
     }
 
@@ -199,13 +201,13 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
-        if(wasPrepared){
-            synchronized(preparedTransactions){
+    public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
+        if (wasPrepared) {
+            synchronized (preparedTransactions) {
                 return (Tx)preparedTransactions.remove(txid);
             }
-        }else{
-            synchronized(inflightTransactions){
+        } else {
+            synchronized (inflightTransactions) {
                 return (Tx)inflightTransactions.remove(txid);
             }
         }
@@ -215,21 +217,22 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void rollback(TransactionId txid) throws IOException{
-        Tx tx=null;
-        synchronized(inflightTransactions){
-            tx=(Tx)inflightTransactions.remove(txid);
-        }
-        if(tx!=null)
-            synchronized(preparedTransactions){
-                tx=(Tx)preparedTransactions.remove(txid);
-            }
-        if(tx!=null){
-            if(txid.isXATransaction()){
-                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
-            }else{
-                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
-                        true);
+    public void rollback(TransactionId txid) throws IOException {
+        Tx tx = null;
+        synchronized (inflightTransactions) {
+            tx = (Tx)inflightTransactions.remove(txid);
+        }
+        if (tx != null)
+            synchronized (preparedTransactions) {
+                tx = (Tx)preparedTransactions.remove(txid);
+            }
+        if (tx != null) {
+            if (txid.isXATransaction()) {
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
+                                                                      false), true);
+            } else {
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
+                                                                      txid, false), true);
             }
         }
     }
@@ -238,42 +241,42 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void replayRollback(TransactionId txid) throws IOException{
-        boolean inflight=false;
-        synchronized(inflightTransactions){
-            inflight=inflightTransactions.remove(txid)!=null;
+    public void replayRollback(TransactionId txid) throws IOException {
+        boolean inflight = false;
+        synchronized (inflightTransactions) {
+            inflight = inflightTransactions.remove(txid) != null;
         }
-        if(inflight){
-            synchronized(preparedTransactions){
+        if (inflight) {
+            synchronized (preparedTransactions) {
                 preparedTransactions.remove(txid);
             }
         }
     }
-    
+
     public void start() throws Exception {
     }
 
     public void stop() throws Exception {
     }
-    
-    synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
+
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
         // All the in-flight transactions get rolled back..
-        synchronized(inflightTransactions){
+        synchronized (inflightTransactions) {
             inflightTransactions.clear();
         }
-        this.doingRecover=true;
-        try{
-            Map txs=null;
-            synchronized(preparedTransactions){
-                txs=new LinkedHashMap(preparedTransactions);
-            }
-            for(Iterator iter=txs.keySet().iterator();iter.hasNext();){
-                Object txid=(Object)iter.next();
-                Tx tx=(Tx)txs.get(txid);
-                listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
+        this.doingRecover = true;
+        try {
+            Map txs = null;
+            synchronized (preparedTransactions) {
+                txs = new LinkedHashMap(preparedTransactions);
+            }
+            for (Iterator iter = txs.keySet().iterator(); iter.hasNext();) {
+                Object txid = (Object)iter.next();
+                Tx tx = (Tx)txs.get(txid);
+                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
             }
-        }finally{
-            this.doingRecover=false;
+        } finally {
+            this.doingRecover = false;
         }
     }
 
@@ -290,40 +293,40 @@
      * @param ack
      * @throws IOException
      */
-    public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) throws IOException {
+    public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
+        throws IOException {
         Tx tx = getTx(ack.getTransactionId(), location);
         tx.add(store, ack);
     }
-    
-    
+
     public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
         Tx tx = getTx(ack.getTransactionId(), location);
         tx.add(store, ack);
     }
 
-
-    public RecordLocation checkpoint() throws IOException{
+    public RecordLocation checkpoint() throws IOException {
         // Nothing really to checkpoint.. since, we don't
-        // checkpoint tx operations in to long term store until they are committed.
+        // checkpoint tx operations in to long term store until they are
+        // committed.
         // But we keep track of the first location of an operation
         // that was associated with an active tx. The journal can not
         // roll over active tx records.
-        RecordLocation rc=null;
-        synchronized(inflightTransactions){
-            for(Iterator iter=inflightTransactions.values().iterator();iter.hasNext();){
-                Tx tx=(Tx)iter.next();
-                RecordLocation location=tx.location;
-                if(rc==null||rc.compareTo(location)<0){
-                    rc=location;
+        RecordLocation rc = null;
+        synchronized (inflightTransactions) {
+            for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
+                Tx tx = (Tx)iter.next();
+                RecordLocation location = tx.location;
+                if (rc == null || rc.compareTo(location) < 0) {
+                    rc = location;
                 }
             }
         }
-        synchronized(preparedTransactions){
-            for(Iterator iter=preparedTransactions.values().iterator();iter.hasNext();){
-                Tx tx=(Tx)iter.next();
-                RecordLocation location=tx.location;
-                if(rc==null||rc.compareTo(location)<0){
-                    rc=location;
+        synchronized (preparedTransactions) {
+            for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
+                Tx tx = (Tx)iter.next();
+                RecordLocation location = tx.location;
+                if (rc == null || rc.compareTo(location) < 0) {
+                    rc = location;
                 }
             }
             return rc;
@@ -333,6 +336,5 @@
     public boolean isDoingRecover() {
         return doingRecover;
     }
-
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java Wed Aug  8 11:56:59 2007
@@ -28,35 +28,36 @@
 
 /**
  * Marshall an AMQTx
+ * 
  * @version $Revision: 1.10 $
  */
-public class AMQTxMarshaller implements Marshaller<AMQTx>{
+public class AMQTxMarshaller implements Marshaller<AMQTx> {
 
     private WireFormat wireFormat;
 
-    public AMQTxMarshaller(WireFormat wireFormat){
-        this.wireFormat=wireFormat;
+    public AMQTxMarshaller(WireFormat wireFormat) {
+        this.wireFormat = wireFormat;
     }
 
-    public AMQTx readPayload(DataInput dataIn) throws IOException{
-        Location location=new Location();
+    public AMQTx readPayload(DataInput dataIn) throws IOException {
+        Location location = new Location();
         location.readExternal(dataIn);
-        AMQTx result=new AMQTx(location);
-        int size=dataIn.readInt();
-        for(int i=0;i<size;i++){
-            AMQTxOperation op=new AMQTxOperation();
-            op.readExternal(wireFormat,dataIn);
+        AMQTx result = new AMQTx(location);
+        int size = dataIn.readInt();
+        for (int i = 0; i < size; i++) {
+            AMQTxOperation op = new AMQTxOperation();
+            op.readExternal(wireFormat, dataIn);
             result.getOperations().add(op);
         }
         return result;
     }
 
-    public void writePayload(AMQTx amqtx,DataOutput dataOut) throws IOException{
+    public void writePayload(AMQTx amqtx, DataOutput dataOut) throws IOException {
         amqtx.getLocation().writeExternal(dataOut);
-        List<AMQTxOperation> list=amqtx.getOperations();
+        List<AMQTxOperation> list = amqtx.getOperations();
         dataOut.writeInt(list.size());
-        for(AMQTxOperation op:list){
-            op.writeExternal(wireFormat,dataOut);
+        for (AMQTxOperation op : list) {
+            op.writeExternal(wireFormat, dataOut);
         }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java Wed Aug  8 11:56:59 2007
@@ -22,20 +22,19 @@
 import org.apache.activemq.kaha.Marshaller;
 import java.util.concurrent.atomic.AtomicInteger;
 
-
 /**
  * Marshall an AtomicInteger
+ * 
  * @version $Revision: 1.10 $
  */
-public class AtomicIntegerMarshaller implements Marshaller<AtomicInteger>{
-   
+public class AtomicIntegerMarshaller implements Marshaller<AtomicInteger> {
+
+    public void writePayload(AtomicInteger ai, DataOutput dataOut) throws IOException {
+        dataOut.writeInt(ai.get());
 
-    public void writePayload(AtomicInteger ai,DataOutput dataOut) throws IOException{
-       dataOut.writeInt(ai.get());
-       
     }
 
-    public AtomicInteger readPayload(DataInput dataIn) throws IOException{
+    public AtomicInteger readPayload(DataInput dataIn) throws IOException {
         int value = dataIn.readInt();
         return new AtomicInteger(value);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java Wed Aug  8 11:56:59 2007
@@ -22,59 +22,56 @@
  * 
  * @version $Revision: 1.10 $
  */
-public class ConsumerMessageRef{
+public class ConsumerMessageRef {
 
     private MessageId messageId;
     private StoreEntry messageEntry;
     private StoreEntry ackEntry;
-    
+
     /**
      * @return the ackEntry
      */
-    public StoreEntry getAckEntry(){
+    public StoreEntry getAckEntry() {
         return this.ackEntry;
     }
-    
+
     /**
      * @param ackEntry the ackEntry to set
      */
-    public void setAckEntry(StoreEntry ackEntry){
-        this.ackEntry=ackEntry;
+    public void setAckEntry(StoreEntry ackEntry) {
+        this.ackEntry = ackEntry;
     }
-    
+
     /**
      * @return the messageEntry
      */
-    public StoreEntry getMessageEntry(){
+    public StoreEntry getMessageEntry() {
         return this.messageEntry;
     }
-    
+
     /**
      * @param messageEntry the messageEntry to set
      */
-    public void setMessageEntry(StoreEntry messageEntry){
-        this.messageEntry=messageEntry;
+    public void setMessageEntry(StoreEntry messageEntry) {
+        this.messageEntry = messageEntry;
     }
 
-    
     /**
      * @return the messageId
      */
-    public MessageId getMessageId(){
+    public MessageId getMessageId() {
         return this.messageId;
     }
 
-    
     /**
      * @param messageId the messageId to set
      */
-    public void setMessageId(MessageId messageId){
-        this.messageId=messageId;
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
     }
-    
+
     public String toString() {
-        return "ConsumerMessageRef[" + messageId +"]";
+        return "ConsumerMessageRef[" + messageId + "]";
     }
 
-       
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java Wed Aug  8 11:56:59 2007
@@ -23,31 +23,30 @@
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 
-
 /**
  * Marshall a TopicSubAck
+ * 
  * @version $Revision: 1.10 $
  */
-public class ConsumerMessageRefMarshaller implements Marshaller{
-   
+public class ConsumerMessageRefMarshaller implements Marshaller {
 
     /**
      * @param object
      * @param dataOut
      * @throws IOException
-     * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
+     * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object,
+     *      java.io.DataOutput)
      */
-    public void writePayload(Object object,DataOutput dataOut) throws IOException{
-       ConsumerMessageRef ref = (ConsumerMessageRef) object;
-       dataOut.writeUTF(ref.getMessageId().toString());
-       IndexItem item = (IndexItem)ref.getMessageEntry();
-       dataOut.writeLong(item.getOffset());
-       item.write(dataOut);
-       item = (IndexItem)ref.getAckEntry();
-       dataOut.writeLong(item.getOffset());
-       item.write(dataOut);
-       
-       
+    public void writePayload(Object object, DataOutput dataOut) throws IOException {
+        ConsumerMessageRef ref = (ConsumerMessageRef)object;
+        dataOut.writeUTF(ref.getMessageId().toString());
+        IndexItem item = (IndexItem)ref.getMessageEntry();
+        dataOut.writeLong(item.getOffset());
+        item.write(dataOut);
+        item = (IndexItem)ref.getAckEntry();
+        dataOut.writeLong(item.getOffset());
+        item.write(dataOut);
+
     }
 
     /**
@@ -56,7 +55,7 @@
      * @throws IOException
      * @see org.apache.activemq.kaha.Marshaller#readPayload(java.io.DataInput)
      */
-    public Object readPayload(DataInput dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException {
         ConsumerMessageRef ref = new ConsumerMessageRef();
         ref.setMessageId(new MessageId(dataIn.readUTF()));
         IndexItem item = new IndexItem();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/IntegerMarshaller.java Wed Aug  8 11:56:59 2007
@@ -22,18 +22,18 @@
 
 import org.apache.activemq.kaha.Marshaller;
 
-
 /**
  * Marshall an Integer
+ * 
  * @version $Revision: 1.10 $
  */
 public class IntegerMarshaller implements Marshaller<Integer> {
-   
-    public void writePayload(Integer object,DataOutput dataOut) throws IOException{
-       dataOut.writeInt(object.intValue());
+
+    public void writePayload(Integer object, DataOutput dataOut) throws IOException {
+        dataOut.writeInt(object.intValue());
     }
 
-    public Integer readPayload(DataInput dataIn) throws IOException{
+    public Integer readPayload(DataInput dataIn) throws IOException {
         return dataIn.readInt();
     }
 }