You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/24 16:48:55 UTC
svn commit: r947657 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/store/
activemq-core/src/main/java/org/apache/activemq/store/amq/
activemq-core/src/main/java/org/apache/activemq/store/journal/
activemq-core/src/main/java/org/a...
Author: gtully
Date: Mon May 24 14:48:55 2010
New Revision: 947657
URL: http://svn.apache.org/viewvc?rev=947657&view=rev
Log:
allow journal write batching, resolve global transaction lock and journal lock, let writes accumulate on the datafileappender rather than at the store, also tie transaction completion and after commit processing together with a callback rather than with a global lock so that concurrent commits can batch their writes - rework of fix for https://issues.apache.org/activemq/browse/AMQ-2594
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java Mon May 24 14:48:55 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
import java.io.IOException;
+import java.util.concurrent.FutureTask;
import org.apache.activemq.Service;
import org.apache.activemq.command.TransactionId;
@@ -31,7 +32,7 @@ public interface TransactionStore extend
void prepare(TransactionId txid) throws IOException;
- void commit(TransactionId txid, boolean wasPrepared) throws IOException;
+ void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException;
void rollback(TransactionId txid) throws IOException;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Mon May 24 14:48:55 2010
@@ -99,7 +99,7 @@ public class AMQTransactionStore impleme
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
AMQTx tx;
if (wasPrepared) {
synchronized (preparedTransactions) {
@@ -111,6 +111,7 @@ public class AMQTransactionStore impleme
}
}
if (tx == null) {
+ done.run();
return;
}
if (txid.isXATransaction()) {
@@ -118,6 +119,7 @@ public class AMQTransactionStore impleme
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
}
+ done.run();
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Mon May 24 14:48:55 2010
@@ -176,7 +176,7 @@ public class JournalTransactionStore imp
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
Tx tx;
if (wasPrepared) {
synchronized (preparedTransactions) {
@@ -188,6 +188,7 @@ public class JournalTransactionStore imp
}
}
if (tx == null) {
+ done.run();
return;
}
if (txid.isXATransaction()) {
@@ -197,6 +198,7 @@ public class JournalTransactionStore imp
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
wasPrepared), true);
}
+ done.run();
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Mon May 24 14:48:55 2010
@@ -101,12 +101,13 @@ public class KahaTransactionStore implem
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
KahaTransaction tx = getTx(txid);
if (tx != null) {
tx.commit(this);
removeTx(txid);
}
+ done.run();
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Mon May 24 14:48:55 2010
@@ -241,14 +241,14 @@ public class KahaDBStore extends Message
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore() {
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
- store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+ store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true, done);
}
public void prepare(TransactionId txid) throws IOException {
- store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true);
+ store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true, null);
}
public void rollback(TransactionId txid) throws IOException {
- store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false);
+ store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false, null);
}
public void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
@@ -333,7 +333,7 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
+ store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
}
@@ -345,13 +345,13 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
+ store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
- store(command, true);
+ store(command, true, null);
}
public Message getMessage(MessageId identity) throws IOException {
@@ -519,7 +519,6 @@ public class KahaDBStore extends Message
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask task = asyncTopicMap.get(messageId);
if (task != null) {
-
if (task.addSubscriptionKey(subscriptionKey)) {
removeTopicTask(messageId);
task.cancel();
@@ -538,7 +537,7 @@ public class KahaDBStore extends Message
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
- store(command, false);
+ store(command, false, null);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@@ -550,7 +549,7 @@ public class KahaDBStore extends Message
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && true);
+ store(command, isEnableJournalDiskSyncs() && true, null);
this.subscriptionCount.incrementAndGet();
}
@@ -558,7 +557,7 @@ public class KahaDBStore extends Message
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
- store(command, isEnableJournalDiskSyncs() && true);
+ store(command, isEnableJournalDiskSyncs() && true, null);
this.subscriptionCount.decrementAndGet();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon May 24 14:48:55 2010
@@ -630,7 +630,7 @@ public class MessageDatabase extends Ser
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
public Location store(JournalCommand data) throws IOException {
- return store(data, false);
+ return store(data, false, null);
}
/**
@@ -638,8 +638,9 @@ public class MessageDatabase extends Ser
* to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done
* during a recovery process.
+ * @param done
*/
- public Location store(JournalCommand data, boolean sync) throws IOException {
+ public Location store(JournalCommand data, boolean sync, Runnable done) throws IOException {
try {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -662,6 +663,9 @@ public class MessageDatabase extends Ser
LOG.info("KahaDB: Recovering checkpoint thread after exception");
startCheckpoint();
}
+ if (done != null) {
+ done.run();
+ }
return location;
} catch (IOException ioe) {
LOG.error("KahaDB failed to store to Journal", ioe);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Mon May 24 14:48:55 2010
@@ -72,8 +72,9 @@ public class TempKahaDBStore extends Tem
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
processCommit(txid);
+ done.run();
}
public void prepare(TransactionId txid) throws IOException {
processPrepare(txid);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Mon May 24 14:48:55 2010
@@ -194,7 +194,7 @@ public class MemoryTransactionStore impl
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
Tx tx;
if (wasPrepared) {
@@ -204,9 +204,11 @@ public class MemoryTransactionStore impl
}
if (tx == null) {
+ done.run();
return;
}
tx.commit();
+ done.run();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java Mon May 24 14:48:55 2010
@@ -69,21 +69,8 @@ public class LocalTransaction extends Tr
context.getTransactions().remove(xid);
// Sync on transaction store to avoid out of order messages in the cursor
// https://issues.apache.org/activemq/browse/AMQ-2594
- synchronized (transactionStore) {
- transactionStore.commit(getTransactionId(), false);
-
- try {
- fireAfterCommit();
- } catch (Throwable e) {
- // I guess this could happen. Post commit task failed
- // to execute properly.
- LOG.warn("POST COMMIT FAILED: ", e);
- XAException xae = new XAException("POST COMMIT FAILED");
- xae.errorCode = XAException.XAER_RMERR;
- xae.initCause(e);
- throw xae;
- }
- }
+ transactionStore.commit(getTransactionId(), false, postCommitTask);
+ this.waitPostCommitDone(postCommitTask);
}
public void rollback() throws XAException, IOException {
@@ -120,5 +107,9 @@ public class LocalTransaction extends Tr
public TransactionId getTransactionId() {
return xid;
}
-
+
+ @Override
+ public Log getLog() {
+ return LOG;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Mon May 24 14:48:55 2010
@@ -17,13 +17,18 @@
package org.apache.activemq.transaction;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.TransactionId;
+import org.apache.commons.logging.Log;
/**
* Keeps track of all the actions the need to be done when a transaction does a
@@ -31,7 +36,7 @@ import org.apache.activemq.command.Trans
*
* @version $Revision: 1.5 $
*/
-public abstract class Transaction {
+public abstract class Transaction implements Callable {
public static final byte START_STATE = 0; // can go to: 1,2,3
public static final byte IN_USE_STATE = 1; // can go to: 2,3
@@ -40,7 +45,8 @@ public abstract class Transaction {
private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
-
+ protected FutureTask<?> postCommitTask = new FutureTask(this);
+
public byte getState() {
return state;
}
@@ -108,6 +114,8 @@ public abstract class Transaction {
public abstract TransactionId getTransactionId();
+ public abstract Log getLog();
+
public boolean isPrepared() {
return getState() == PREPARED_STATE;
}
@@ -115,4 +123,41 @@ public abstract class Transaction {
public int size() {
return synchronizations.size();
}
+
+ protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
+ try {
+ postCommitTask.get();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof XAException) {
+ throw (XAException) t;
+ } else if (t instanceof IOException) {
+ throw (IOException) t;
+ } else {
+ throw new XAException(e.toString());
+ }
+ }
+ }
+
+ protected void doPostCommit() throws XAException {
+ try {
+ fireAfterCommit();
+ } catch (Throwable e) {
+ // I guess this could happen. Post commit task failed
+ // to execute properly.
+ getLog().warn("POST COMMIT FAILED: ", e);
+ XAException xae = new XAException("POST COMMIT FAILED");
+ xae.errorCode = XAException.XAER_RMERR;
+ xae.initCause(e);
+ throw xae;
+ }
+ }
+
+ public Object call() throws Exception {
+ doPostCommit();
+ return null;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Mon May 24 14:48:55 2010
@@ -64,15 +64,15 @@ public class XATransaction extends Trans
checkForPreparedState(onePhase);
doPrePrepare();
setStateFinished();
- transactionStore.commit(getTransactionId(), false);
- doPostCommit();
+ transactionStore.commit(getTransactionId(), false, postCommitTask);
+ waitPostCommitDone(postCommitTask);
break;
case PREPARED_STATE:
// 2 phase commit, work done.
// We would record commit here.
setStateFinished();
- transactionStore.commit(getTransactionId(), true);
- doPostCommit();
+ transactionStore.commit(getTransactionId(), true, postCommitTask);
+ waitPostCommitDone(postCommitTask);
break;
default:
illegalStateTransition("commit");
@@ -108,20 +108,6 @@ public class XATransaction extends Trans
}
}
- private void doPostCommit() throws XAException {
- try {
- fireAfterCommit();
- } catch (Throwable e) {
- // I guess this could happen. Post commit task failed
- // to execute properly.
- LOG.warn("POST COMMIT FAILED: ", e);
- XAException xae = new XAException("POST COMMIT FAILED");
- xae.errorCode = XAException.XAER_RMERR;
- xae.initCause(e);
- throw xae;
- }
- }
-
public void rollback() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
@@ -195,4 +181,9 @@ public class XATransaction extends Trans
public TransactionId getTransactionId() {
return xid;
}
+
+ @Override
+ public Log getLog() {
+ return LOG;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/ThreadExplorer.java Mon May 24 14:48:55 2010
@@ -62,6 +62,7 @@ public class ThreadExplorer
* @param isStarredExp
* (regular expressions with *)
*/
+ @SuppressWarnings("deprecation")
public static int kill(String threadName, boolean isStarredExp, String motivation)
{
String me = "ThreadExplorer.kill: ";
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java Mon May 24 14:48:55 2010
@@ -35,12 +35,15 @@ import org.apache.activemq.broker.Transp
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.1 $
*/
public class ConnectorXBeanConfigTest extends TestCase {
+ private static final Log LOG = LogFactory.getLog(ConnectorXBeanConfigTest.class);
protected BrokerService brokerService;
public void testConnectorConfiguredCorrectly() throws Exception {
@@ -76,6 +79,7 @@ public class ConnectorXBeanConfigTest ex
brokerService.start(true); // force restart
brokerService.waitUntilStarted();
+ LOG.info("try and connect to restarted broker");
//send and receive a message from a restarted broker
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636");
Connection conn = factory.createConnection();
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=947657&r1=947656&r2=947657&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Mon May 24 14:48:55 2010
@@ -596,12 +596,12 @@ public class Journal {
return rc;
}
- public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+ public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
return loc;
}
- public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
+ public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
return loc;
}