You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/15 22:33:41 UTC
svn commit: r955039 [2/2] - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/region/ command/ store/ store/amq/ store/journal/ store/kahadaptor/
store/kahadb/ store/memory/ transaction/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Jun 15 20:33:41 2010
@@ -37,9 +37,6 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.InvalidSelectorException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -63,17 +60,13 @@ import org.apache.activemq.store.Message
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaLocation;
-import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -88,26 +81,28 @@ import org.apache.commons.logging.LogFac
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
-public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
- private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
+public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
+ static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
- protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
+ protected final Map<AsyncJobKey, StoreTopicTask> asyncTopicMap = new HashMap<AsyncJobKey, StoreTopicTask>();
private final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
- private Semaphore queueSemaphore;
- private Semaphore topicSemaphore;
+ Semaphore globalQueueSemaphore;
+ Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
-
+ private final KahaDBTransactionStore transactionStore;
+
public KahaDBStore() {
+ this.transactionStore = new KahaDBTransactionStore(this);
}
-
+
public void setBrokerName(String brokerName) {
}
@@ -166,8 +161,8 @@ public class KahaDBStore extends Message
@Override
public void doStart() throws Exception {
super.doStart();
- this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
- this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
+ this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
+ this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
@@ -190,11 +185,23 @@ public class KahaDBStore extends Message
@Override
public void doStop(ServiceStopper stopper) throws Exception {
- if (this.queueSemaphore != null) {
- this.queueSemaphore.drainPermits();
+ synchronized (this.asyncQueueMap) {
+ for (StoreQueueTask task : this.asyncQueueMap.values()) {
+ task.cancel();
+ }
+ this.asyncQueueMap.clear();
+ }
+ synchronized (this.asyncTopicMap) {
+ for (StoreTopicTask task : this.asyncTopicMap.values()) {
+ task.cancel();
+ }
+ this.asyncTopicMap.clear();
+ }
+ if (this.globalQueueSemaphore != null) {
+ this.globalQueueSemaphore.drainPermits();
}
- if (this.topicSemaphore != null) {
- this.topicSemaphore.drainPermits();
+ if (this.globalTopicSemaphore != null) {
+ this.globalTopicSemaphore.drainPermits();
}
if (this.queueExecutor != null) {
this.queueExecutor.shutdownNow();
@@ -205,99 +212,50 @@ public class KahaDBStore extends Message
super.doStop(stopper);
}
- protected StoreQueueTask removeQueueTask(ActiveMQDestination activeMQDestination, MessageId id) {
- StoreQueueTask task = this.asyncQueueMap.remove(new AsyncJobKey(id, activeMQDestination));
- if (task != null) {
- task.getMessage().decrementReferenceCount();
- this.queueSemaphore.release();
+ protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
+ StoreQueueTask task = null;
+ synchronized (this.asyncQueueMap) {
+ task = this.asyncQueueMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
- protected void addQueueTask(ActiveMQDestination activeMQDestination, StoreQueueTask task) throws IOException {
- try {
- this.queueSemaphore.acquire();
-
- } catch (InterruptedException e) {
- throw new InterruptedIOException(e.getMessage());
+ protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
+ synchronized (this.asyncQueueMap) {
+ this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
- this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), activeMQDestination), task);
- task.getMessage().incrementReferenceCount();
this.queueExecutor.execute(task);
}
- protected StoreTopicTask removeTopicTask(MessageId id) {
- StoreTopicTask task = this.asyncTopicMap.remove(id);
- if (task != null) {
- task.getMessage().decrementReferenceCount();
- this.topicSemaphore.release();
+ protected StoreTopicTask removeTopicTask(KahaDBMessageStore store, MessageId id) {
+ StoreTopicTask task = null;
+ synchronized (this.asyncTopicMap) {
+ task = this.asyncTopicMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
- protected void addTopicTask(StoreTopicTask task) throws IOException {
- try {
- this.topicSemaphore.acquire();
- } catch (InterruptedException e) {
- throw new InterruptedIOException(e.getMessage());
+ protected void addTopicTask(KahaDBMessageStore store, StoreTopicTask task) throws IOException {
+ synchronized (this.asyncTopicMap) {
+ this.asyncTopicMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
- this.asyncTopicMap.put(task.getMessage().getMessageId(), task);
- task.getMessage().incrementReferenceCount();
this.topicExecutor.execute(task);
}
public TransactionStore createTransactionStore() throws IOException {
- return new TransactionStore() {
-
- public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
- store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true, done);
- }
- public void prepare(TransactionId txid) throws IOException {
- store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true, null);
- }
- public void rollback(TransactionId txid) throws IOException {
- store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false, null);
- }
- public void recover(TransactionRecoveryListener listener) throws IOException {
- for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
- XATransactionId xid = (XATransactionId) entry.getKey();
- ArrayList<Message> messageList = new ArrayList<Message>();
- ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-
- for (Operation op : entry.getValue()) {
- if (op.getClass() == AddOpperation.class) {
- AddOpperation addOp = (AddOpperation) op;
- Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand()
- .getMessage().newInput()));
- messageList.add(msg);
- } else {
- RemoveOpperation rmOp = (RemoveOpperation) op;
- Buffer ackb = rmOp.getCommand().getAck();
- MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
- ackList.add(ack);
- }
- }
-
- Message[] addedMessages = new Message[messageList.size()];
- MessageAck[] acks = new MessageAck[ackList.size()];
- messageList.toArray(addedMessages);
- ackList.toArray(acks);
- listener.recover(xid, addedMessages, acks);
- }
- }
- public void start() throws Exception {
- }
- public void stop() throws Exception {
- }
- };
+ return this.transactionStore;
}
public class KahaDBMessageStore extends AbstractMessageStore {
protected KahaDestination dest;
+ private final int maxAsyncJobs;
+ private final Semaphore localDestinationSemaphore;
public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
this.dest = convert(destination);
+ this.maxAsyncJobs = getMaxAsyncJobs();
+ this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
}
@Override
@@ -310,7 +268,8 @@ public class KahaDBStore extends Message
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message);
- addQueueTask(destination, result);
+ result.aquireLocks();
+ addQueueTask(this, result);
return result.getFuture();
} else {
return super.asyncAddQueueMessage(context, message);
@@ -320,10 +279,15 @@ public class KahaDBStore extends Message
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
- StoreQueueTask task = removeQueueTask(destination, ack.getLastMessageId());
+ AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
+ StoreQueueTask task = null;
+ synchronized (asyncQueueMap) {
+ task = asyncQueueMap.get(key);
+ }
if (task != null) {
if (!task.cancel()) {
try {
+
task.future.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
@@ -331,6 +295,10 @@ public class KahaDBStore extends Message
LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
}
removeMessage(context, ack);
+ } else {
+ synchronized (asyncQueueMap) {
+ asyncQueueMap.remove(key);
+ }
}
} else {
removeMessage(context, ack);
@@ -348,7 +316,7 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
+ store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@@ -359,13 +327,13 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null);
+ store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
- store(command, true, null);
+ store(command, true, null, null);
}
public Message getMessage(MessageId identity) throws IOException {
@@ -395,21 +363,27 @@ public class KahaDBStore extends Message
}
public int getMessageCount() throws IOException {
- synchronized (indexMutex) {
- return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
- public Integer execute(Transaction tx) throws IOException {
- // Iterate through all index entries to get a count of
- // messages in the destination.
- StoredDestination sd = getStoredDestination(dest, tx);
- int rc = 0;
- for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
- .hasNext();) {
- iterator.next();
- rc++;
+ try {
+ lockAsyncJobQueue();
+ synchronized (indexMutex) {
+ return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
+ public Integer execute(Transaction tx) throws IOException {
+ // Iterate through all index entries to get a count
+ // of
+ // messages in the destination.
+ StoredDestination sd = getStoredDestination(dest, tx);
+ int rc = 0;
+ for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
+ .hasNext();) {
+ iterator.next();
+ rc++;
+ }
+ return rc;
}
- return rc;
- }
- });
+ });
+ }
+ } finally {
+ unlockAsyncJobQueue();
}
}
@@ -452,11 +426,12 @@ public class KahaDBStore extends Message
Entry<Long, MessageKeys> entry = null;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
- .hasNext();) {
+ .hasNext()
+ && listener.hasSpace();) {
entry = iterator.next();
listener.recoverMessage(loadMessage(entry.getValue().location));
counter++;
- if (counter >= maxReturned) {
+ if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
@@ -474,22 +449,27 @@ public class KahaDBStore extends Message
@Override
public void setBatch(MessageId identity) throws IOException {
- final String key = identity.toString();
+ try {
+ final String key = identity.toString();
+ lockAsyncJobQueue();
- // Hopefully one day the page file supports concurrent read
- // operations... but for now we must
- // externally synchronize...
- Long location;
- synchronized (indexMutex) {
- location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
- public Long execute(Transaction tx) throws IOException {
- StoredDestination sd = getStoredDestination(dest, tx);
- return sd.messageIdIndex.get(tx, key);
- }
- });
- }
- if (location != null) {
- cursorPos = location + 1;
+ // Hopefully one day the page file supports concurrent read
+ // operations... but for now we must
+ // externally synchronize...
+ Long location;
+ synchronized (indexMutex) {
+ location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
+ public Long execute(Transaction tx) throws IOException {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ return sd.messageIdIndex.get(tx, key);
+ }
+ });
+ }
+ if (location != null) {
+ cursorPos = location + 1;
+ }
+ } finally {
+ unlockAsyncJobQueue();
}
}
@@ -506,6 +486,30 @@ public class KahaDBStore extends Message
super.stop();
}
+ protected void lockAsyncJobQueue() {
+ try {
+ this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to lock async jobs for " + this.destination, e);
+ }
+ }
+
+ protected void unlockAsyncJobQueue() {
+ this.localDestinationSemaphore.release(this.maxAsyncJobs);
+ }
+
+ protected void acquireLocalAsyncLock() {
+ try {
+ this.localDestinationSemaphore.acquire();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to aquire async lock for " + this.destination, e);
+ }
+ }
+
+ protected void releaseLocalAsyncLock() {
+ this.localDestinationSemaphore.release();
+ }
+
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
@@ -520,7 +524,8 @@ public class KahaDBStore extends Message
throws IOException {
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
- addTopicTask(result);
+ result.aquireLocks();
+ addTopicTask(this, result);
return result.getFuture();
} else {
return super.asyncAddTopicMessage(context, message);
@@ -531,11 +536,19 @@ public class KahaDBStore extends Message
throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName);
if (isConcurrentStoreAndDispatchTopics()) {
- StoreTopicTask task = asyncTopicMap.get(messageId);
+ AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
+ StoreTopicTask task = null;
+ synchronized (asyncTopicMap) {
+ task = asyncTopicMap.get(key);
+ }
if (task != null) {
if (task.addSubscriptionKey(subscriptionKey)) {
- removeTopicTask(messageId);
- task.cancel();
+ removeTopicTask(this, messageId);
+ if (task.cancel()) {
+ synchronized (asyncTopicMap) {
+ asyncTopicMap.remove(key);
+ }
+ }
}
} else {
doAcknowledge(context, subscriptionKey, messageId);
@@ -551,7 +564,7 @@ public class KahaDBStore extends Message
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
- store(command, false, null);
+ store(command, false, null, null);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@@ -563,7 +576,7 @@ public class KahaDBStore extends Message
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && true, null);
+ store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.incrementAndGet();
}
@@ -571,7 +584,7 @@ public class KahaDBStore extends Message
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
- store(command, isEnableJournalDiskSyncs() && true, null);
+ store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet();
}
@@ -698,7 +711,7 @@ public class KahaDBStore extends Message
entry = iterator.next();
listener.recoverMessage(loadMessage(entry.getValue().location));
counter++;
- if (counter >= maxReturned) {
+ if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
@@ -732,11 +745,11 @@ public class KahaDBStore extends Message
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- return new KahaDBMessageStore(destination);
+ return this.transactionStore.proxy(new KahaDBMessageStore(destination));
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
- return new KahaDBTopicMessageStore(destination);
+ return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
}
/**
@@ -928,20 +941,19 @@ public class KahaDBStore extends Message
static class AsyncJobKey {
MessageId id;
ActiveMQDestination destination;
-
+
AsyncJobKey(MessageId id, ActiveMQDestination destination) {
this.id = id;
this.destination = destination;
}
-
+
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
- return obj instanceof AsyncJobKey &&
- id.equals(((AsyncJobKey)obj).id) &&
- destination.equals(((AsyncJobKey)obj).destination);
+ return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
+ && destination.equals(((AsyncJobKey) obj).destination);
}
@Override
@@ -949,24 +961,25 @@ public class KahaDBStore extends Message
return id.hashCode() + destination.hashCode();
}
+ @Override
public String toString() {
- return destination.getPhysicalName() + "-" + id;
+ return destination.getPhysicalName() + "-" + id;
}
}
-
+
class StoreQueueTask implements Runnable {
protected final Message message;
protected final ConnectionContext context;
- protected final MessageStore store;
+ protected final KahaDBMessageStore store;
protected final InnerFutureTask future;
protected final AtomicBoolean done = new AtomicBoolean();
+ protected final AtomicBoolean locked = new AtomicBoolean();
- public StoreQueueTask(MessageStore store, ConnectionContext context, Message message) {
+ public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
this.store = store;
this.context = context;
this.message = message;
this.future = new InnerFutureTask(this);
-
}
public Future<Object> getFuture() {
@@ -974,21 +987,45 @@ public class KahaDBStore extends Message
}
public boolean cancel() {
+ releaseLocks();
if (this.done.compareAndSet(false, true)) {
return this.future.cancel(false);
}
return false;
}
+ void aquireLocks() {
+ if (this.locked.compareAndSet(false, true)) {
+ try {
+ globalQueueSemaphore.acquire();
+ store.acquireLocalAsyncLock();
+ message.incrementReferenceCount();
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to aquire lock", e);
+ }
+ }
+
+ }
+
+ void releaseLocks() {
+ if (this.locked.compareAndSet(true, false)) {
+ store.releaseLocalAsyncLock();
+ globalQueueSemaphore.release();
+ message.decrementReferenceCount();
+ }
+ }
+
public void run() {
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
- removeQueueTask(this.store.getDestination(), this.message.getMessageId());
+ removeQueueTask(this.store, this.message.getMessageId());
this.future.complete();
}
} catch (Exception e) {
this.future.setException(e);
+ } finally {
+ releaseLocks();
}
}
@@ -1025,6 +1062,29 @@ public class KahaDBStore extends Message
}
+ @Override
+ void aquireLocks() {
+ if (this.locked.compareAndSet(false, true)) {
+ try {
+ globalTopicSemaphore.acquire();
+ store.acquireLocalAsyncLock();
+ message.incrementReferenceCount();
+ } catch (InterruptedException e) {
+ LOG.warn("Failed to aquire lock", e);
+ }
+ }
+
+ }
+
+ @Override
+ void releaseLocks() {
+ if (this.locked.compareAndSet(true, false)) {
+ message.decrementReferenceCount();
+ store.releaseLocalAsyncLock();
+ globalTopicSemaphore.release();
+ }
+ }
+
/**
* add a key
*
@@ -1047,13 +1107,16 @@ public class KahaDBStore extends Message
synchronized (this.subscriptionKeys) {
for (String key : this.subscriptionKeys) {
this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
+
}
}
- removeTopicTask(this.message.getMessageId());
+ removeTopicTask(this.topicStore, this.message.getMessageId());
this.future.complete();
}
} catch (Exception e) {
this.future.setException(e);
+ } finally {
+ releaseLocks();
}
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=955039&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import javax.transaction.xa.XAException;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
+import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
+import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
+import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
+import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Provides a TransactionStore implementation that can create transaction aware
+ * MessageStore objects from non transaction aware MessageStore objects.
+ *
+ * @version $Revision: 1.4 $
+ */
+public class KahaDBTransactionStore implements TransactionStore {
+
+ ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
+ private final WireFormat wireFormat = new OpenWireFormat();
+ private final KahaDBStore theStore;
+
+ public KahaDBTransactionStore(KahaDBStore theStore) {
+ this.theStore = theStore;
+ }
+
+ public class Tx {
+ private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+
+ private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
+
+ public void add(AddMessageCommand msg) {
+ messages.add(msg);
+ }
+
+ public void add(RemoveMessageCommand ack) {
+ acks.add(ack);
+ }
+
+ public Message[] getMessages() {
+ Message rc[] = new Message[messages.size()];
+ int count = 0;
+ for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+ AddMessageCommand cmd = iter.next();
+ rc[count++] = cmd.getMessage();
+ }
+ return rc;
+ }
+
+ public MessageAck[] getAcks() {
+ MessageAck rc[] = new MessageAck[acks.size()];
+ int count = 0;
+ for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+ RemoveMessageCommand cmd = iter.next();
+ rc[count++] = cmd.getMessageAck();
+ }
+ return rc;
+ }
+
+ /**
+ * @return true if something to commit
+ * @throws IOException
+ */
+ public List<Future<Object>> commit() throws IOException {
+ List<Future<Object>> results = new ArrayList<Future<Object>>();
+ // Do all the message adds.
+ for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+ AddMessageCommand cmd = iter.next();
+ results.add(cmd.run());
+
+ }
+ // And removes..
+ for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+ RemoveMessageCommand cmd = iter.next();
+ cmd.run();
+ results.add(cmd.run());
+ }
+
+ return results;
+ }
+ }
+
+ public abstract class AddMessageCommand {
+ private final ConnectionContext ctx;
+ AddMessageCommand(ConnectionContext ctx) {
+ this.ctx = ctx;
+ }
+ abstract Message getMessage();
+ Future<Object> run() throws IOException {
+ return run(this.ctx);
+ }
+ abstract Future<Object> run(ConnectionContext ctx) throws IOException;
+ }
+
+ public abstract class RemoveMessageCommand {
+
+ private final ConnectionContext ctx;
+ RemoveMessageCommand(ConnectionContext ctx) {
+ this.ctx = ctx;
+ }
+ abstract MessageAck getMessageAck();
+ Future<Object> run() throws IOException {
+ return run(this.ctx);
+ }
+ abstract Future<Object> run(ConnectionContext context) throws IOException;
+ }
+
+ public MessageStore proxy(MessageStore messageStore) {
+ return new ProxyMessageStore(messageStore) {
+ @Override
+ public void addMessage(ConnectionContext context, final Message send) throws IOException {
+ KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
+ }
+
+ @Override
+ public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
+ }
+
+ @Override
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
+ }
+
+ @Override
+ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
+ }
+ };
+ }
+
+ public TopicMessageStore proxy(TopicMessageStore messageStore) {
+ return new ProxyTopicMessageStore(messageStore) {
+ @Override
+ public void addMessage(ConnectionContext context, final Message send) throws IOException {
+ KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
+ }
+
+ @Override
+ public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
+ return AbstractMessageStore.FUTURE;
+ }
+
+ @Override
+ public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
+ }
+
+ @Override
+ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
+ }
+ };
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+ */
+ public void prepare(TransactionId txid) throws IOException {
+ inflightTransactions.remove(txid);
+ KahaTransactionInfo info = getTransactionInfo(txid);
+ theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
+ }
+
+ public Tx getTx(Object txid) {
+ Tx tx = inflightTransactions.get(txid);
+ if (tx == null) {
+ tx = new Tx();
+ inflightTransactions.put(txid, tx);
+ }
+ return tx;
+ }
+
+ /**
+ * @throws XAException
+ * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+ */
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
+ throws IOException {
+ if (txid != null) {
+ if (!txid.isXATransaction()) {
+ if (preCommit != null) {
+ preCommit.run();
+ }
+ Tx tx = inflightTransactions.remove(txid);
+ if (tx != null) {
+ List<Future<Object>> results = tx.commit();
+ boolean doneSomething = false;
+ for (Future<Object> result : results) {
+ try {
+ result.get();
+ } catch (InterruptedException e) {
+ theStore.brokerService.handleIOException(new IOException(e));
+ } catch (ExecutionException e) {
+ theStore.brokerService.handleIOException(new IOException(e));
+ }catch(CancellationException e) {
+ }
+ if (!result.isCancelled()) {
+ doneSomething = true;
+ }
+ }
+
+ if (postCommit != null) {
+ postCommit.run();
+ }
+ if (doneSomething) {
+ KahaTransactionInfo info = getTransactionInfo(txid);
+ theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
+ }
+ }else {
+ //The Tx will be null for failed over clients - lets run their post commits
+ if (postCommit != null) {
+ postCommit.run();
+ }
+ }
+
+ } else {
+ KahaTransactionInfo info = getTransactionInfo(txid);
+ theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+ }
+
+ }
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+ */
+ public void rollback(TransactionId txid) throws IOException {
+ if (txid.isXATransaction()) {
+ KahaTransactionInfo info = getTransactionInfo(txid);
+ theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
+ } else {
+ Object result = inflightTransactions.remove(txid);
+ }
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
+ // All the inflight transactions get rolled back..
+ // inflightTransactions.clear();
+ for (Map.Entry<TransactionId, ArrayList<Operation>> entry : theStore.preparedTransactions.entrySet()) {
+ XATransactionId xid = (XATransactionId) entry.getKey();
+ ArrayList<Message> messageList = new ArrayList<Message>();
+ ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
+
+ for (Operation op : entry.getValue()) {
+ if (op.getClass() == AddOpperation.class) {
+ AddOpperation addOp = (AddOpperation) op;
+ Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
+ .newInput()));
+ messageList.add(msg);
+ } else {
+ RemoveOpperation rmOp = (RemoveOpperation) op;
+ Buffer ackb = rmOp.getCommand().getAck();
+ MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
+ ackList.add(ack);
+ }
+ }
+
+ Message[] addedMessages = new Message[messageList.size()];
+ MessageAck[] acks = new MessageAck[ackList.size()];
+ messageList.toArray(addedMessages);
+ ackList.toArray(acks);
+ listener.recover(xid, addedMessages, acks);
+ }
+ }
+
+ /**
+ * @param message
+ * @throws IOException
+ */
+ void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
+ throws IOException {
+
+ if (message.getTransactionId() != null) {
+ if (message.getTransactionId().isXATransaction()) {
+ destination.addMessage(context, message);
+ } else {
+ Tx tx = getTx(message.getTransactionId());
+ tx.add(new AddMessageCommand(context) {
+ @Override
+ public Message getMessage() {
+ return message;
+ }
+ @Override
+ public Future<Object> run(ConnectionContext ctx) throws IOException {
+ destination.addMessage(ctx, message);
+ return AbstractMessageStore.FUTURE;
+ }
+
+ });
+ }
+ } else {
+ destination.addMessage(context, message);
+ }
+ }
+
+ Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
+ throws IOException {
+
+ if (message.getTransactionId() != null) {
+ if (message.getTransactionId().isXATransaction()) {
+ return destination.asyncAddQueueMessage(context, message);
+ } else {
+ Tx tx = getTx(message.getTransactionId());
+ tx.add(new AddMessageCommand(context) {
+ @Override
+ public Message getMessage() {
+ return message;
+ }
+ @Override
+ public Future<Object> run(ConnectionContext ctx) throws IOException {
+ return destination.asyncAddQueueMessage(ctx, message);
+ }
+
+ });
+ return AbstractMessageStore.FUTURE;
+ }
+ } else {
+ return destination.asyncAddQueueMessage(context, message);
+ }
+ }
+
+ Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
+ throws IOException {
+
+ if (message.getTransactionId() != null) {
+ if (message.getTransactionId().isXATransaction()) {
+ return destination.asyncAddTopicMessage(context, message);
+ } else {
+ Tx tx = getTx(message.getTransactionId());
+ tx.add(new AddMessageCommand(context) {
+ @Override
+ public Message getMessage() {
+ return message;
+ }
+ @Override
+ public Future run(ConnectionContext ctx) throws IOException {
+ return destination.asyncAddTopicMessage(ctx, message);
+ }
+
+ });
+ return AbstractMessageStore.FUTURE;
+ }
+ } else {
+ return destination.asyncAddTopicMessage(context, message);
+ }
+ }
+
+ /**
+ * @param ack
+ * @throws IOException
+ */
+ final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
+ throws IOException {
+
+ if (ack.isInTransaction()) {
+ if (ack.getTransactionId().isXATransaction()) {
+ destination.removeMessage(context, ack);
+ } else {
+ Tx tx = getTx(ack.getTransactionId());
+ tx.add(new RemoveMessageCommand(context) {
+ @Override
+ public MessageAck getMessageAck() {
+ return ack;
+ }
+
+ @Override
+ public Future<Object> run(ConnectionContext ctx) throws IOException {
+ destination.removeMessage(ctx, ack);
+ return AbstractMessageStore.FUTURE;
+ }
+ });
+ }
+ } else {
+ destination.removeMessage(context, ack);
+ }
+ }
+
+ final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
+ throws IOException {
+
+ if (ack.isInTransaction()) {
+ if (ack.getTransactionId().isXATransaction()) {
+ destination.removeAsyncMessage(context, ack);
+ } else {
+ Tx tx = getTx(ack.getTransactionId());
+ tx.add(new RemoveMessageCommand(context) {
+ @Override
+ public MessageAck getMessageAck() {
+ return ack;
+ }
+
+ @Override
+ public Future<Object> run(ConnectionContext ctx) throws IOException {
+ destination.removeMessage(ctx, ack);
+ return AbstractMessageStore.FUTURE;
+ }
+ });
+ }
+ } else {
+ destination.removeAsyncMessage(context, ack);
+ }
+ }
+
+ private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
+ return theStore.createTransactionInfo(txid);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Jun 15 20:33:41 2010
@@ -631,7 +631,7 @@ public class MessageDatabase extends Ser
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
public Location store(JournalCommand data) throws IOException {
- return store(data, false, null);
+ return store(data, false, null,null);
}
/**
@@ -641,8 +641,11 @@ public class MessageDatabase extends Ser
* during a recovery process.
* @param done
*/
- public Location store(JournalCommand data, boolean sync, Runnable done) throws IOException {
- try {
+ public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException {
+ if (before != null) {
+ before.run();
+ }
+ try {
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
@@ -664,8 +667,8 @@ public class MessageDatabase extends Ser
LOG.info("KahaDB: Recovering checkpoint thread after exception");
startCheckpoint();
}
- if (done != null) {
- done.run();
+ if (after != null) {
+ after.run();
}
return location;
} catch (IOException ioe) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Tue Jun 15 20:33:41 2010
@@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -62,7 +61,7 @@ import org.apache.kahadb.page.Transactio
public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
- private WireFormat wireFormat = new OpenWireFormat();
+ private final WireFormat wireFormat = new OpenWireFormat();
public void setBrokerName(String brokerName) {
}
@@ -72,9 +71,14 @@ public class TempKahaDBStore extends Tem
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
- public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
+ if (preCommit != null) {
+ preCommit.run();
+ }
processCommit(txid);
- done.run();
+ if (postCommit != null) {
+ postCommit.run();
+ }
}
public void prepare(TransactionId txid) throws IOException {
processPrepare(txid);
@@ -122,6 +126,7 @@ public class TempKahaDBStore extends Tem
this.dest = convert( destination );
}
+ @Override
public ActiveMQDestination getDestination() {
return destination;
}
@@ -254,10 +259,13 @@ public class TempKahaDBStore extends Tem
}
+ @Override
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
+ @Override
public void start() throws Exception {
}
+ @Override
public void stop() throws Exception {
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -21,9 +21,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
-
import javax.transaction.xa.XAException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -53,9 +51,9 @@ public class MemoryTransactionStore impl
private boolean doingRecover;
public class Tx {
- private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+ private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
- private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
+ private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
public void add(AddMessageCommand msg) {
messages.add(msg);
@@ -130,19 +128,23 @@ public class MemoryTransactionStore impl
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
+ @Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
+ @Override
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
+ @Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
+ @Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@@ -151,19 +153,23 @@ public class MemoryTransactionStore impl
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
+ @Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
+ @Override
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
+ @Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
+ @Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
@@ -194,8 +200,10 @@ public class MemoryTransactionStore impl
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
-
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
+ if (preCommit != null) {
+ preCommit.run();
+ }
Tx tx;
if (wasPrepared) {
tx = preparedTransactions.remove(txid);
@@ -204,11 +212,15 @@ public class MemoryTransactionStore impl
}
if (tx == null) {
- done.run();
+ if (postCommit != null) {
+ postCommit.run();
+ }
return;
}
tx.commit();
- done.run();
+ if (postCommit != null) {
+ postCommit.run();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java Tue Jun 15 20:33:41 2010
@@ -17,9 +17,7 @@
package org.apache.activemq.transaction;
import java.io.IOException;
-
import javax.transaction.xa.XAException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.TransactionId;
@@ -44,6 +42,7 @@ public class LocalTransaction extends Tr
this.context = context;
}
+ @Override
public void commit(boolean onePhase) throws XAException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("commit: " + xid
@@ -69,10 +68,11 @@ public class LocalTransaction extends Tr
context.getTransactions().remove(xid);
// Sync on transaction store to avoid out of order messages in the cursor
// https://issues.apache.org/activemq/browse/AMQ-2594
- transactionStore.commit(getTransactionId(), false, postCommitTask);
+ transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask);
this.waitPostCommitDone(postCommitTask);
}
+ @Override
public void rollback() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
@@ -98,12 +98,14 @@ public class LocalTransaction extends Tr
}
}
+ @Override
public int prepare() throws XAException {
XAException xae = new XAException("Prepare not implemented on Local Transactions.");
xae.errorCode = XAException.XAER_RMERR;
throw xae;
}
+ @Override
public TransactionId getTransactionId() {
return xid;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java Tue Jun 15 20:33:41 2010
@@ -23,6 +23,9 @@ public class Synchronization {
public void beforeEnd() throws Exception {
}
+
+ public void beforeCommit() throws Exception {
+ }
public void afterCommit() throws Exception {
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Tue Jun 15 20:33:41 2010
@@ -24,9 +24,7 @@ import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
-
import javax.transaction.xa.XAException;
-
import org.apache.activemq.command.TransactionId;
import org.apache.commons.logging.Log;
@@ -36,16 +34,27 @@ import org.apache.commons.logging.Log;
*
* @version $Revision: 1.5 $
*/
-public abstract class Transaction implements Callable {
+public abstract class Transaction {
public static final byte START_STATE = 0; // can go to: 1,2,3
public static final byte IN_USE_STATE = 1; // can go to: 2,3
public static final byte PREPARED_STATE = 2; // can go to: 3
public static final byte FINISHED_STATE = 3;
- private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
+ private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
- protected FutureTask<?> postCommitTask = new FutureTask(this);
+ protected FutureTask<?> preCommitTask = new FutureTask<Object>(new Callable<Object>() {
+ public Object call() throws Exception {
+ doPreCommit();
+ return null;
+ }
+ });
+ protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
+ public Object call() throws Exception {
+ doPostCommit();
+ return null;
+ }
+ });
public byte getState() {
return state;
@@ -86,6 +95,13 @@ public abstract class Transaction implem
// r.execute();
// }
}
+
+ protected void fireBeforeCommit() throws Exception {
+ for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+ Synchronization s = iter.next();
+ s.beforeCommit();
+ }
+ }
protected void fireAfterCommit() throws Exception {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
@@ -102,6 +118,7 @@ public abstract class Transaction implem
}
}
+ @Override
public String toString() {
return super.toString() + "[synchronizations=" + synchronizations + "]";
}
@@ -140,6 +157,20 @@ public abstract class Transaction implem
}
}
}
+
+ protected void doPreCommit() throws XAException {
+ try {
+ fireBeforeCommit();
+ } catch (Throwable e) {
+ // I guess this could happen. Post commit task failed
+ // to execute properly.
+ getLog().warn("PRE COMMIT FAILED: ", e);
+ XAException xae = new XAException("PRE COMMIT FAILED");
+ xae.errorCode = XAException.XAER_RMERR;
+ xae.initCause(e);
+ throw xae;
+ }
+ }
protected void doPostCommit() throws XAException {
try {
@@ -154,10 +185,4 @@ public abstract class Transaction implem
throw xae;
}
}
-
- public Object call() throws Exception {
- doPostCommit();
- return null;
- }
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Tue Jun 15 20:33:41 2010
@@ -17,10 +17,8 @@
package org.apache.activemq.transaction;
import java.io.IOException;
-
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
-
import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@@ -48,6 +46,7 @@ public class XATransaction extends Trans
}
}
+ @Override
public void commit(boolean onePhase) throws XAException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("XA Transaction commit: " + xid);
@@ -64,14 +63,14 @@ public class XATransaction extends Trans
checkForPreparedState(onePhase);
doPrePrepare();
setStateFinished();
- transactionStore.commit(getTransactionId(), false, postCommitTask);
+ transactionStore.commit(getTransactionId(), false, preCommitTask,postCommitTask);
waitPostCommitDone(postCommitTask);
break;
case PREPARED_STATE:
// 2 phase commit, work done.
// We would record commit here.
setStateFinished();
- transactionStore.commit(getTransactionId(), true, postCommitTask);
+ transactionStore.commit(getTransactionId(), true, preCommitTask,postCommitTask);
waitPostCommitDone(postCommitTask);
break;
default:
@@ -108,6 +107,7 @@ public class XATransaction extends Trans
}
}
+ @Override
public void rollback() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
@@ -151,6 +151,7 @@ public class XATransaction extends Trans
}
}
+ @Override
public int prepare() throws XAException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("XA Transaction prepare: " + xid);
@@ -178,6 +179,7 @@ public class XATransaction extends Trans
broker.removeTransaction(xid);
}
+ @Override
public TransactionId getTransactionId() {
return xid;
}