You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/25 23:20:44 UTC
svn commit: r948209 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Tue May 25 21:20:40 2010
New Revision: 948209
URL: http://svn.apache.org/viewvc?rev=948209&view=rev
Log:
resolve regression in ThreeBrokerVirtualTopicNetworkTest - asnyc tasks need to use destination in key as id is not uniqueue with virtual topics. Also, on a failed cancle, we must wait for the write to compete so the ack/remove does not lag the write leaving an outstanding message. consequence of fixes for https://issues.apache.org/activemq/browse/AMQ-2620 and
https://issues.apache.org/activemq/browse/AMQ-2568
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=948209&r1=948208&r2=948209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue May 25 21:20:40 2010
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -72,19 +73,21 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
+ private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
- protected final Map<MessageId, StoreQueueTask> asyncQueueMap = new HashMap<MessageId, StoreQueueTask>();
+ protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
private final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
@@ -95,11 +98,10 @@ public class KahaDBStore extends Message
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
- private Scheduler scheduler;
-
+
public KahaDBStore() {
-
}
+
public void setBrokerName(String brokerName) {
}
@@ -197,8 +199,8 @@ public class KahaDBStore extends Message
super.doStop(stopper);
}
- protected StoreQueueTask removeQueueTask(MessageId id) {
- StoreQueueTask task = this.asyncQueueMap.remove(id);
+ protected StoreQueueTask removeQueueTask(ActiveMQDestination activeMQDestination, MessageId id) {
+ StoreQueueTask task = this.asyncQueueMap.remove(new AsyncJobKey(id, activeMQDestination));
if (task != null) {
task.getMessage().decrementReferenceCount();
this.queueSemaphore.release();
@@ -206,14 +208,14 @@ public class KahaDBStore extends Message
return task;
}
- protected void addQueueTask(StoreQueueTask task) throws IOException {
+ protected void addQueueTask(ActiveMQDestination activeMQDestination, StoreQueueTask task) throws IOException {
try {
this.queueSemaphore.acquire();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
- this.asyncQueueMap.put(task.getMessage().getMessageId(), task);
+ this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), activeMQDestination), task);
task.getMessage().incrementReferenceCount();
this.queueExecutor.execute(task);
}
@@ -302,7 +304,7 @@ public class KahaDBStore extends Message
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message);
- addQueueTask(result);
+ addQueueTask(destination, result);
return result.getFuture();
} else {
return super.asyncAddQueueMessage(context, message);
@@ -312,9 +314,16 @@ public class KahaDBStore extends Message
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
- StoreQueueTask task = removeQueueTask(ack.getLastMessageId());
+ StoreQueueTask task = removeQueueTask(destination, ack.getLastMessageId());
if (task != null) {
if (!task.cancel()) {
+ try {
+ task.future.get();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ } catch (Exception ignored) {
+ LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
+ }
removeMessage(context, ack);
}
} else {
@@ -334,7 +343,6 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
-
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@@ -894,6 +902,35 @@ public class KahaDBStore extends Message
}
}
+ static class AsyncJobKey {
+ MessageId id;
+ ActiveMQDestination destination;
+
+ AsyncJobKey(MessageId id, ActiveMQDestination destination) {
+ this.id = id;
+ this.destination = destination;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ return obj instanceof AsyncJobKey &&
+ id.equals(((AsyncJobKey)obj).id) &&
+ destination.equals(((AsyncJobKey)obj).destination);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode() + destination.hashCode();
+ }
+
+ public String toString() {
+ return destination.getPhysicalName() + "-" + id;
+ }
+ }
+
class StoreQueueTask implements Runnable {
protected final Message message;
protected final ConnectionContext context;
@@ -915,8 +952,7 @@ public class KahaDBStore extends Message
public boolean cancel() {
if (this.done.compareAndSet(false, true)) {
- this.future.cancel(false);
- return true;
+ return this.future.cancel(false);
}
return false;
}
@@ -925,7 +961,7 @@ public class KahaDBStore extends Message
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
- removeQueueTask(this.message.getMessageId());
+ removeQueueTask(this.store.getDestination(), this.message.getMessageId());
this.future.complete();
}
} catch (Exception e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=948209&r1=948208&r2=948209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue May 25 21:20:40 2010
@@ -744,7 +744,6 @@ public class MessageDatabase extends Ser
synchronized (indexMutex) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location));
- TransactionId key = key(command.getTransactionInfo());
}
} else {
synchronized (indexMutex) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java?rev=948209&r1=948208&r2=948209&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java Tue May 25 21:20:40 2010
@@ -114,6 +114,7 @@ public class ThreeBrokerVirtualTopicNetw
// ensure we don't get any more messages
Thread.sleep(2000);
+ LOG.info("MessagesA: " + msgsA.getMessageIds());
assertEquals(10, msgsA.getMessageCount());
assertEquals(11, msgsB.getMessageCount());
assertEquals(11, msgsC.getMessageCount());
@@ -141,6 +142,7 @@ public class ThreeBrokerVirtualTopicNetw
// ensure we don't get any more messages
Thread.sleep(5000);
+ LOG.info("Extra MessagesA: " + msgsA.getMessageIds());
assertEquals(0, msgsA.getMessageCount());
assertEquals(11, msgsB.getMessageCount());
assertEquals(11, msgsC.getMessageCount());