You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/24 16:48:55 UTC

svn commit: r947657 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/ activemq-core/src/main/java/org/apache/activemq/store/amq/ activemq-core/src/main/java/org/apache/activemq/store/journal/ activemq-core/src/main/java/org/a...

Author: gtully
Date: Mon May 24 14:48:55 2010
New Revision: 947657

URL: http://svn.apache.org/viewvc?rev=947657&view=rev
Log:
allow journal write batching, resolve global transaction lock and journal lock, let writes accumulate on the datafileappender rather than at the store, also tie transaction completion and after commit processing together with a callback rather than with a global lock so that concurrent commits can batch their writes - rework of fix for https://issues.apache.org/activemq/browse/AMQ-2594

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java Mon May 24 14:48:55 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
+import java.util.concurrent.FutureTask;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.command.TransactionId;
@@ -31,7 +32,7 @@ public interface TransactionStore extend
 
     void prepare(TransactionId txid) throws IOException;
 
-    void commit(TransactionId txid, boolean wasPrepared) throws IOException;
+    void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException;
 
     void rollback(TransactionId txid) throws IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Mon May 24 14:48:55 2010
@@ -99,7 +99,7 @@ public class AMQTransactionStore impleme
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
         AMQTx tx;
         if (wasPrepared) {
             synchronized (preparedTransactions) {
@@ -111,6 +111,7 @@ public class AMQTransactionStore impleme
             }
         }
         if (tx == null) {
+            done.run();
             return;
         }
         if (txid.isXATransaction()) {
@@ -118,6 +119,7 @@ public class AMQTransactionStore impleme
         } else {
             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
         }
+        done.run();
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Mon May 24 14:48:55 2010
@@ -176,7 +176,7 @@ public class JournalTransactionStore imp
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
         Tx tx;
         if (wasPrepared) {
             synchronized (preparedTransactions) {
@@ -188,6 +188,7 @@ public class JournalTransactionStore imp
             }
         }
         if (tx == null) {
+            done.run();
             return;
         }
         if (txid.isXATransaction()) {
@@ -197,6 +198,7 @@ public class JournalTransactionStore imp
             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
                                                                   wasPrepared), true);
         }
+        done.run();
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Mon May 24 14:48:55 2010
@@ -101,12 +101,13 @@ public class KahaTransactionStore implem
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
         KahaTransaction tx = getTx(txid);
         if (tx != null) {
             tx.commit(this);
             removeTx(txid);
         }
+        done.run();
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Mon May 24 14:48:55 2010
@@ -241,14 +241,14 @@ public class KahaDBStore extends Message
     public TransactionStore createTransactionStore() throws IOException {
         return new TransactionStore() {
 
-            public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
-                store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+            public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+                store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true, done);
             }
             public void prepare(TransactionId txid) throws IOException {
-                store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+                store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true, null);
             }
             public void rollback(TransactionId txid) throws IOException {
-                store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false);
+                store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false, null);
             }
             public void recover(TransactionRecoveryListener listener) throws IOException {
                 for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
@@ -333,7 +333,7 @@ public class KahaDBStore extends Message
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
+            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
 
         }
 
@@ -345,13 +345,13 @@ public class KahaDBStore extends Message
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
             command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
+            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null);
         }
 
         public void removeAllMessages(ConnectionContext context) throws IOException {
             KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
             command.setDestination(dest);
-            store(command, true);
+            store(command, true, null);
         }
 
         public Message getMessage(MessageId identity) throws IOException {
@@ -519,7 +519,6 @@ public class KahaDBStore extends Message
             if (isConcurrentStoreAndDispatchTopics()) {
                 StoreTopicTask task = asyncTopicMap.get(messageId);
                 if (task != null) {
-
                     if (task.addSubscriptionKey(subscriptionKey)) {
                         removeTopicTask(messageId);
                         task.cancel();
@@ -538,7 +537,7 @@ public class KahaDBStore extends Message
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
             command.setMessageId(messageId.toString());
-            store(command, false);
+            store(command, false, null);
         }
 
         public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@@ -550,7 +549,7 @@ public class KahaDBStore extends Message
             command.setRetroactive(retroactive);
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
             command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && true);
+            store(command, isEnableJournalDiskSyncs() && true, null);
             this.subscriptionCount.incrementAndGet();
         }
 
@@ -558,7 +557,7 @@ public class KahaDBStore extends Message
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
-            store(command, isEnableJournalDiskSyncs() && true);
+            store(command, isEnableJournalDiskSyncs() && true, null);
             this.subscriptionCount.decrementAndGet();
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon May 24 14:48:55 2010
@@ -630,7 +630,7 @@ public class MessageDatabase extends Ser
     // Methods call by the broker to update and query the store.
     // /////////////////////////////////////////////////////////////////
     public Location store(JournalCommand data) throws IOException {
-        return store(data, false);
+        return store(data, false, null);
     }
 
     /**
@@ -638,8 +638,9 @@ public class MessageDatabase extends Ser
      * to a JournalMessage which is logged to the journal and then the data from
      * the JournalMessage is used to update the index just like it would be done
      * during a recovery process.
+     * @param done 
      */
-    public Location store(JournalCommand data, boolean sync) throws IOException {
+    public Location store(JournalCommand data, boolean sync, Runnable done) throws IOException {
     	try {
             int size = data.serializedSizeFramed();
             DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -662,6 +663,9 @@ public class MessageDatabase extends Ser
                 LOG.info("KahaDB: Recovering checkpoint thread after exception");
                 startCheckpoint();
             }
+            if (done != null) {
+                done.run();
+            }
             return location;
     	} catch (IOException ioe) {
             LOG.error("KahaDB failed to store to Journal", ioe);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Mon May 24 14:48:55 2010
@@ -72,8 +72,9 @@ public class TempKahaDBStore extends Tem
     public TransactionStore createTransactionStore() throws IOException {
         return new TransactionStore(){
             
-            public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+            public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
                 processCommit(txid);
+                done.run();
             }
             public void prepare(TransactionId txid) throws IOException {
             	processPrepare(txid);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Mon May 24 14:48:55 2010
@@ -194,7 +194,7 @@ public class MemoryTransactionStore impl
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
 
         Tx tx;
         if (wasPrepared) {
@@ -204,9 +204,11 @@ public class MemoryTransactionStore impl
         }
 
         if (tx == null) {
+            done.run();
             return;
         }
         tx.commit();
+        done.run();
 
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java Mon May 24 14:48:55 2010
@@ -69,21 +69,8 @@ public class LocalTransaction extends Tr
         context.getTransactions().remove(xid);
         // Sync on transaction store to avoid out of order messages in the cursor
         // https://issues.apache.org/activemq/browse/AMQ-2594
-        synchronized (transactionStore) {
-            transactionStore.commit(getTransactionId(), false);
-
-            try {
-                fireAfterCommit();
-            } catch (Throwable e) {
-                // I guess this could happen. Post commit task failed
-                // to execute properly.
-                LOG.warn("POST COMMIT FAILED: ", e);
-                XAException xae = new XAException("POST COMMIT FAILED");
-                xae.errorCode = XAException.XAER_RMERR;
-                xae.initCause(e);
-                throw xae;
-            }
-        }
+        transactionStore.commit(getTransactionId(), false, postCommitTask);
+        this.waitPostCommitDone(postCommitTask);
     }
 
     public void rollback() throws XAException, IOException {
@@ -120,5 +107,9 @@ public class LocalTransaction extends Tr
     public TransactionId getTransactionId() {
         return xid;
     }
-
+    
+    @Override
+    public Log getLog() {
+        return LOG;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Mon May 24 14:48:55 2010
@@ -17,13 +17,18 @@
 package org.apache.activemq.transaction;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 
 import javax.transaction.xa.XAException;
 
 import org.apache.activemq.command.TransactionId;
+import org.apache.commons.logging.Log;
 
 /**
  * Keeps track of all the actions the need to be done when a transaction does a
@@ -31,7 +36,7 @@ import org.apache.activemq.command.Trans
  * 
  * @version $Revision: 1.5 $
  */
-public abstract class Transaction {
+public abstract class Transaction implements Callable {
 
     public static final byte START_STATE = 0; // can go to: 1,2,3
     public static final byte IN_USE_STATE = 1; // can go to: 2,3
@@ -40,7 +45,8 @@ public abstract class Transaction {
 
     private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
     private byte state = START_STATE;
-
+    protected FutureTask<?> postCommitTask = new FutureTask(this);
+    
     public byte getState() {
         return state;
     }
@@ -108,6 +114,8 @@ public abstract class Transaction {
 
     public abstract TransactionId getTransactionId();
 
+    public abstract Log getLog();
+    
     public boolean isPrepared() {
         return getState() == PREPARED_STATE;
     }
@@ -115,4 +123,41 @@ public abstract class Transaction {
     public int size() {
         return synchronizations.size();
     }
+    
+    protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
+        try {
+            postCommitTask.get();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.toString());
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof XAException) {
+                throw (XAException) t;
+            } else if (t instanceof IOException) {
+                throw (IOException) t;
+            } else {
+                throw new XAException(e.toString());
+            }
+        }    
+    }
+
+    protected void doPostCommit() throws XAException {
+        try {
+            fireAfterCommit();
+        } catch (Throwable e) {
+            // I guess this could happen. Post commit task failed
+            // to execute properly.
+            getLog().warn("POST COMMIT FAILED: ", e);
+            XAException xae = new XAException("POST COMMIT FAILED");
+            xae.errorCode = XAException.XAER_RMERR;
+            xae.initCause(e);
+            throw xae;
+        }
+    }
+    
+    public Object call() throws Exception {
+        doPostCommit();
+        return null;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Mon May 24 14:48:55 2010
@@ -64,15 +64,15 @@ public class XATransaction extends Trans
             checkForPreparedState(onePhase);
             doPrePrepare();
             setStateFinished();
-            transactionStore.commit(getTransactionId(), false);
-            doPostCommit();
+            transactionStore.commit(getTransactionId(), false, postCommitTask);
+            waitPostCommitDone(postCommitTask);
             break;
         case PREPARED_STATE:
             // 2 phase commit, work done.
             // We would record commit here.
             setStateFinished();
-            transactionStore.commit(getTransactionId(), true);
-            doPostCommit();
+            transactionStore.commit(getTransactionId(), true, postCommitTask);
+            waitPostCommitDone(postCommitTask);
             break;
         default:
             illegalStateTransition("commit");
@@ -108,20 +108,6 @@ public class XATransaction extends Trans
         }
     }
 
-    private void doPostCommit() throws XAException {
-        try {
-            fireAfterCommit();
-        } catch (Throwable e) {
-            // I guess this could happen. Post commit task failed
-            // to execute properly.
-            LOG.warn("POST COMMIT FAILED: ", e);
-            XAException xae = new XAException("POST COMMIT FAILED");
-            xae.errorCode = XAException.XAER_RMERR;
-            xae.initCause(e);
-            throw xae;
-        }
-    }
-
     public void rollback() throws XAException, IOException {
 
         if (LOG.isDebugEnabled()) {
@@ -195,4 +181,9 @@ public class XATransaction extends Trans
     public TransactionId getTransactionId() {
         return xid;
     }
+    
+    @Override
+    public Log getLog() {
+        return LOG;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java Mon May 24 14:48:55 2010
@@ -62,6 +62,7 @@ public class ThreadExplorer
      * @param isStarredExp
      *            (regular expressions with *)
      */
+    @SuppressWarnings("deprecation")
     public static int kill(String threadName, boolean isStarredExp, String motivation)
     {
         String me = "ThreadExplorer.kill: ";

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java Mon May 24 14:48:55 2010
@@ -35,12 +35,15 @@ import org.apache.activemq.broker.Transp
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.1 $
  */
 public class ConnectorXBeanConfigTest extends TestCase {
 
+    private static final Log LOG = LogFactory.getLog(ConnectorXBeanConfigTest.class);
     protected BrokerService brokerService;
 
     public void testConnectorConfiguredCorrectly() throws Exception {
@@ -76,6 +79,7 @@ public class ConnectorXBeanConfigTest ex
     	brokerService.start(true); // force restart
     	brokerService.waitUntilStarted();
     	
+    	LOG.info("try and connect to restarted broker");
     	//send and receive a message from a restarted broker
     	ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636");
     	Connection conn = factory.createConnection();

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Mon May 24 14:48:55 2010
@@ -596,12 +596,12 @@ public class Journal {
         return rc;
     }
 
-    public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
         Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
         return loc;
     }
 
-    public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
+    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
         Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
         return loc;
     }