You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/04/30 17:19:51 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5077 - reduce
reader thread work when client uses async send;
async store updates can now queue up to the destination memory limit b/c they
don't block the send thread. Pending store writes are now tr
Repository: activemq
Updated Branches:
refs/heads/trunk 8498136f5 -> ad1f751a4
https://issues.apache.org/jira/browse/AMQ-5077 - reduce reader thread work when client uses async send; async store updates can now queue up to the destination memory limit b/c they don't block the send thread. Pending store writes are now tracked in memory usage. This allows a client to quickly provide a burst of messages to fill the destination cache bounded only by network bandwidth
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ad1f751a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ad1f751a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ad1f751a
Branch: refs/heads/trunk
Commit: ad1f751a412ddb258256c1e8d2e72858b52f43ca
Parents: 8498136
Author: gtully <ga...@gmail.com>
Authored: Wed Apr 30 16:10:18 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Apr 30 16:13:18 2014 +0100
----------------------------------------------------------------------
.../region/PendingMarshalUsageTracker.java | 39 +++++++++
.../apache/activemq/broker/region/Queue.java | 10 ++-
.../activemq/store/AbstractMessageStore.java | 24 ++----
.../activemq/store/InlineListenableFuture.java | 58 +++++++++++++
.../apache/activemq/store/ListenableFuture.java | 28 +++++++
.../org/apache/activemq/store/MessageStore.java | 13 ++-
.../activemq/store/ProxyMessageStore.java | 8 +-
.../activemq/store/ProxyTopicMessageStore.java | 8 +-
.../store/memory/MemoryTransactionStore.java | 20 ++---
.../org/apache/activemq/ActiveMQConnection.java | 11 ++-
.../activemq/store/kahadb/KahaDBStore.java | 34 ++++++--
.../store/kahadb/KahaDBTransactionStore.java | 14 ++--
.../kahadb/MultiKahaDBTransactionStore.java | 13 ++-
.../org/apache/activemq/leveldb/DBManager.scala | 26 +++++-
.../apache/activemq/leveldb/LevelDBStore.scala | 7 +-
.../activemq/JmsMultipleClientsTestSupport.java | 6 ++
.../activemq/broker/QueueSubscriptionTest.java | 8 ++
.../activemq/broker/TopicSubscriptionTest.java | 6 +-
.../broker/virtual/VirtualDestPerfTest.java | 88 +++++++++++++++++---
.../activemq/usecases/QueueBrowsingTest.java | 2 +-
20 files changed, 334 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java
new file mode 100644
index 0000000..78acb53
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PendingMarshalUsageTracker.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.usage.MemoryUsage;
+
+public class PendingMarshalUsageTracker implements Runnable {
+ final MemoryUsage usage;
+ int messageSize;
+ public PendingMarshalUsageTracker(final Message message) {
+ usage = message.getMemoryUsage();
+ if (usage != null) {
+ messageSize = message.getSize();
+ usage.increaseUsage(messageSize);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (usage != null) {
+ usage.decreaseUsage(messageSize);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2f3d8bd..06c74db 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -81,6 +81,7 @@ import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
@@ -637,8 +638,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
- LOG.info("Usage Manager Memory Limit ({}) reached on {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
- memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName());
+ LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
+ memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
}
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
@@ -895,7 +896,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
- Future<Object> result = null;
+ ListenableFuture<Object> result = null;
boolean needsOrderingWithTransactions = context.isInTransaction();
producerExchange.incrementSend();
@@ -907,6 +908,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (messages.isCacheEnabled()) {
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
+ result.addListener(new PendingMarshalUsageTracker(message));
} else {
store.addMessage(context, message);
}
@@ -942,7 +944,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (!needsOrderingWithTransactions) {
messageSent(context, message);
}
- if (result != null && !result.isCancelled()) {
+ if (result != null && message.isResponseRequired() && !result.isCancelled()) {
try {
result.get();
} catch (CancellationException e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
index cd8d0f9..df8658f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
@@ -17,9 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -29,7 +26,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore {
- public static final FutureTask<Object> FUTURE;
+ public static final ListenableFuture<Object> FUTURE;
protected final ActiveMQDestination destination;
protected boolean prioritizedMessages;
@@ -89,27 +86,27 @@ abstract public class AbstractMessageStore implements MessageStore {
}
@Override
- public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
@Override
- public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
addMessage(context, message, canOptimizeHint);
return FUTURE;
}
@Override
- public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message,final boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
addMessage(context, message, canOptimizeHint);
return FUTURE;
}
@Override
- public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
addMessage(context, message);
- return FUTURE;
+ return new InlineListenableFuture();
}
@Override
@@ -121,14 +118,7 @@ abstract public class AbstractMessageStore implements MessageStore {
throw new IOException("update is not supported by: " + this);
}
- static class CallableImplementation implements Callable<Object> {
- public Object call() throws Exception {
- return null;
- }
- }
-
static {
- FUTURE = new FutureTask<Object>(new CallableImplementation());
- FUTURE.run();
+ FUTURE = new InlineListenableFuture();
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java b/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java
new file mode 100644
index 0000000..7c2b873
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/InlineListenableFuture.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class InlineListenableFuture implements ListenableFuture<Object> {
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void addListener(Runnable listener) {
+ listener.run();
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java b/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java
new file mode 100644
index 0000000..9072558
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ListenableFuture.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.util.concurrent.Future;
+
+public interface ListenableFuture<T> extends Future<T> {
+ /**
+ * register a listener to be run on completion or immediately if complete
+ * any exceptions will be caught and logged
+ * @param listener
+ */
+ void addListener(Runnable listener);
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index d465bc5..400245a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
-import java.util.concurrent.Future;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
@@ -62,7 +61,7 @@ public interface MessageStore extends Service {
* @throws IOException
* @throws IOException
*/
- Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
+ ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
@@ -74,18 +73,18 @@ public interface MessageStore extends Service {
* @throws IOException
* @throws IOException
*/
- Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
+ ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
*
* @param context context
* @param message
- * @return a Future to track when this is complete
+ * @return a ListenableFuture to track when this is complete
* @throws IOException
* @throws IOException
*/
- Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
+ ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
/**
* Adds a message to the message store
@@ -93,11 +92,11 @@ public interface MessageStore extends Service {
* @param context context
* @param message
* @param canOptimizeHint - give a hint to the store that the message may be consumed before it hits the disk
- * @return a Future to track when this is complete
+ * @return a ListenableFuture to track when this is complete
* @throws IOException
* @throws IOException
*/
- Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
+ ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException;
/**
* Looks up a message using either the String messageID or the
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index e79229b..3ddfadb 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -122,22 +122,22 @@ public class ProxyMessageStore implements MessageStore {
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return asyncAddTopicMessage(context,message,canOptimizeHint);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index c0635fa..de4d195 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -171,22 +171,22 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddTopicMessage(context,message, canOptimizeHint);
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return delegate.asyncAddQueueMessage(context,message, canOptimizeHint);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index 7e02694..c3d1b8a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -22,7 +22,8 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.InlineListenableFuture;
+import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
@@ -38,7 +39,6 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
/**
* Provides a TransactionStore implementation that can create transaction aware
@@ -149,15 +149,15 @@ public class MemoryTransactionStore implements TransactionStore {
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
- return AbstractMessageStore.FUTURE;
+ return new InlineListenableFuture();
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
- return AbstractMessageStore.FUTURE;
+ return new InlineListenableFuture();
}
@Override
@@ -190,15 +190,15 @@ public class MemoryTransactionStore implements TransactionStore {
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
- return AbstractMessageStore.FUTURE;
+ return new InlineListenableFuture();
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
- return AbstractMessageStore.FUTURE;
+ return new InlineListenableFuture();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index e3ce9ae..326310c 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -94,6 +94,7 @@ import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.RequestTimedOutIOException;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@@ -696,7 +697,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// know that the connection is being shutdown.
RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
- doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+ try {
+ doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
+ } catch (JMSException e) {
+ if (e.getCause() instanceof RequestTimedOutIOException) {
+ // expected
+ } else {
+ throw e;
+ }
+ }
doAsyncSendPacket(new ShutdownInfo());
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 1e84642..74425f1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -30,7 +30,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
@@ -60,7 +59,6 @@ import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
@@ -370,7 +368,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
@Override
- public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
+ public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message);
@@ -712,7 +710,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
@Override
- public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
+ public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchTopics()) {
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
@@ -1238,7 +1236,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.future = new InnerFutureTask(this);
}
- public Future<Object> getFuture() {
+ public ListenableFuture<Object> getFuture() {
return this.future;
}
@@ -1295,8 +1293,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return this.message;
}
- private class InnerFutureTask extends FutureTask<Object> {
+ private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> {
+ private Runnable listener;
public InnerFutureTask(Runnable runnable) {
super(runnable, null);
@@ -1309,6 +1308,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void complete() {
super.set(null);
}
+
+ @Override
+ public void done() {
+ fireListener();
+ }
+
+ @Override
+ public void addListener(Runnable listener) {
+ this.listener = listener;
+ if (isDone()) {
+ fireListener();
+ }
+ }
+
+ private void fireListener() {
+ if (listener != null) {
+ try {
+ listener.run();
+ } catch (Exception ignored) {
+ LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index 12e5f00..47a9c34 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -33,9 +33,9 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
@@ -166,12 +166,12 @@ public class KahaDBTransactionStore implements TransactionStore {
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
}
@@ -200,12 +200,12 @@ public class KahaDBTransactionStore implements TransactionStore {
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
}
@@ -389,7 +389,7 @@ public class KahaDBTransactionStore implements TransactionStore {
}
}
- Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
+ ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
@@ -416,7 +416,7 @@ public class KahaDBTransactionStore implements TransactionStore {
}
}
- Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
+ ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index a7d09f1..c7ece83 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -71,12 +70,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
}
@Override
- public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
}
@@ -105,12 +104,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
}
@Override
- public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
}
@@ -384,7 +383,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
destination.addMessage(context, message);
}
- Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
+ ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
getTx(message.getTransactionId()).trackStore(transactionStore);
@@ -395,7 +394,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
}
- Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
+ ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
if (message.getTransactionId() != null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index b02cf0f..775b99a 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -22,7 +22,7 @@ import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent._
import atomic._
import org.fusesource.hawtbuf.Buffer
-import org.apache.activemq.store.MessageRecoveryListener
+import org.apache.activemq.store.{ListenableFuture, MessageRecoveryListener}
import java.lang.ref.WeakReference
import scala.Option._
import org.fusesource.hawtbuf.Buffer._
@@ -97,12 +97,13 @@ object UowCompleted extends UowState {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
+class CountDownFuture[T <: AnyRef]() extends ListenableFuture[T] {
private val latch:CountDownLatch=new CountDownLatch(1)
@volatile
var value:T = _
var error:Throwable = _
+ var listener:Runnable = _
def cancel(mayInterruptIfRunning: Boolean) = false
def isCancelled = false
@@ -115,10 +116,12 @@ class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
def set(v:T) = {
value = v
latch.countDown()
+ fireListener
}
def failed(v:Throwable) = {
error = v
latch.countDown()
+ fireListener
}
def get() = {
@@ -141,6 +144,25 @@ class CountDownFuture[T <: AnyRef]() extends java.util.concurrent.Future[T] {
}
def isDone = latch.await(0, TimeUnit.SECONDS);
+
+ def fireListener = {
+ if (listener != null) {
+ try {
+ listener.run()
+ } catch {
+ case e : Throwable => {
+ LevelDBStore.warn(e, "unexpected exception on future listener " +listener)
+ }
+ }
+ }
+ }
+
+ def addListener(l: Runnable) = {
+ listener = l
+ if (isDone) {
+ fireListener
+ }
+ }
}
object UowManagerConstants {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 9256bb5..d1f8f6b 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -47,8 +47,7 @@ object LevelDBStore extends Log {
}
})
- val DONE = new CountDownFuture[AnyRef]();
- DONE.set(null)
+ val DONE = new InlineListenableFuture;
def toIOException(e: Throwable): IOException = {
if (e.isInstanceOf[ExecutionException]) {
@@ -681,7 +680,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
- override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+ override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): ListenableFuture[AnyRef] = {
check_running
message.getMessageId.setEntryLocator(null)
if( message.getTransactionId!=null ) {
@@ -800,7 +799,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2)
- override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+ override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): ListenableFuture[AnyRef] = {
super.asyncAddQueueMessage(context, message, false)
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
index aa92926..5eaab8d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
@@ -312,6 +312,12 @@ public class JmsMultipleClientsTestSupport {
return currentTestName;
}
+ public void assertDestinationMemoryUsageGoesToZero() throws Exception {
+ assertEquals("destination memory is back to 0", 0,
+ TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
+ }
+
+
/*
* This is copied from AutoFailTestSupport. We may want to move it to someplace where more
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
index 625ed92..6c3dc15 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
@@ -56,6 +56,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@@ -69,6 +70,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@@ -82,6 +84,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 2 * 60 * 1000)
@@ -95,6 +98,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@@ -108,6 +112,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@@ -121,6 +126,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 60 * 1000)
@@ -134,6 +140,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
@Test(timeout = 2 * 60 * 1000)
@@ -147,6 +154,7 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * producerCount);
+ assertDestinationMemoryUsageGoesToZero();
}
protected void configurePrefetchOfOne() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
index 2c530a7..61ba79c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.TestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ThreadTracker;
@@ -155,10 +156,5 @@ public class TopicSubscriptionTest extends QueueSubscriptionTest {
assertTotalMessagesReceived(messageCount * producerCount * consumerCount);
assertDestinationMemoryUsageGoesToZero();
}
-
- private void assertDestinationMemoryUsageGoesToZero() throws Exception {
- assertEquals("destination memory is back to 0", 0,
- TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
index 1f80473..b822f5d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java
@@ -20,12 +20,18 @@ package org.apache.activemq.broker.virtual;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -33,6 +39,7 @@ import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -43,14 +50,73 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
public class VirtualDestPerfTest {
private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
+ public int messageSize = 5*1024;
+ public int messageCount = 10000;
ActiveMQTopic target = new ActiveMQTopic("target");
BrokerService brokerService;
ActiveMQConnectionFactory connectionFactory;
@Test
+ @Ignore("comparison test - 'new' no wait on future with async send broker side is always on")
+ public void testAsyncSendBurstToFillCache() throws Exception {
+ startBroker(4, true, true);
+ connectionFactory.setUseAsyncSend(true);
+
+ // a burst of messages to fill the cache
+ messageCount = 22000;
+ messageSize = 10*1024;
+
+ LinkedHashMap<Integer, Long> results = new LinkedHashMap<Integer, Long>();
+
+ final ActiveMQQueue queue = new ActiveMQQueue("targetQ");
+ for (Integer numThreads : new Integer[]{1, 2}) {
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ final AtomicLong numMessagesToSend = new AtomicLong(messageCount);
+ purge();
+ long startTime = System.currentTimeMillis();
+ for (int i=0;i<numThreads;i++) {
+ executor.execute(new Runnable(){
+ @Override
+ public void run() {
+ try {
+ produceMessages(numMessagesToSend, queue);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.MINUTES);
+ long endTime = System.currentTimeMillis();
+ long seconds = (endTime - startTime) / 1000;
+ LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds);
+ results.put(numThreads, seconds);
+ LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount());
+ }
+
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ LOG.info("Results: {}", results);
+ }
+
+ private void purge() throws Exception {
+ ObjectName[] queues = brokerService.getAdminView().getQueues();
+ if (queues.length == 1) {
+ QueueViewMBean queueViewMBean = (QueueViewMBean)
+ brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false);
+ queueViewMBean.purge();
+ }
+ }
+
+ @Test
@Ignore("comparison test - takes too long and really needs a peek at the graph")
public void testPerf() throws Exception {
LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<Integer, Long>();
@@ -58,10 +124,10 @@ public class VirtualDestPerfTest {
for (int i=2;i<11;i++) {
for (Boolean concurrent : new Boolean[]{true, false}) {
- startBroker(i, concurrent);
+ startBroker(i, concurrent, false);
long startTime = System.currentTimeMillis();
- produceMessages();
+ produceMessages(new AtomicLong(messageCount), target);
long endTime = System.currentTimeMillis();
long seconds = (endTime - startTime) / 1000;
LOG.info("For routes {} duration {}", i, seconds);
@@ -89,20 +155,20 @@ public class VirtualDestPerfTest {
return set.toString().replace(",","%0D%0A").replace("[","").replace("]","").replace(" ", "");
}
-
- protected void produceMessages() throws Exception {
+ protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception {
+ final ByteSequence payLoad = new ByteSequence(new byte[messageSize]);
Connection connection = connectionFactory.createConnection();
- MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(target);
+ MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
- message.setContent(new ByteSequence(new byte[5*1024]));
- for (int i=0; i<10000; i++) {
+ message.setContent(payLoad);
+ while (messageCount.decrementAndGet() >= 0) {
messageProducer.send(message);
}
connection.close();
}
- private void startBroker(int fanoutCount, boolean concurrentSend) throws Exception {
+ private void startBroker(int fanoutCount, boolean concurrentSend, boolean concurrentStoreAndDispatchQueues) throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setUseVirtualTopics(true);
@@ -111,6 +177,8 @@ public class VirtualDestPerfTest {
PolicyMap destPolicyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(0);
+ defaultEntry.setOptimizedDispatch(true);
+ defaultEntry.setCursorMemoryHighWaterMark(110);
destPolicyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(destPolicyMap);
@@ -129,13 +197,13 @@ public class VirtualDestPerfTest {
brokerService.start();
connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
- connectionFactory.setUseAsyncSend(false);
+ connectionFactory.setWatchTopicAdvisories(false);
if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
//with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
// will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
// order issues
- ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+ ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ad1f751a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
index b0c2b50..2c54455 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
@@ -180,7 +180,7 @@ public class QueueBrowsingTest {
@Test
public void testMemoryLimit() throws Exception {
- broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024);
+ broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
int messageToSend = 370;