You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/01 20:26:13 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6387
Repository: activemq
Updated Branches:
refs/heads/master a0d05f8ea -> bb8d32c04
https://issues.apache.org/jira/browse/AMQ-6387
Fix up the Memory Store such that it removes the references it adds to
messages when they are placed into the memory durable topic subscription
store.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bb8d32c0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bb8d32c0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bb8d32c0
Branch: refs/heads/master
Commit: bb8d32c04aa06735d0036963685a4bc41fcbaad7
Parents: a0d05f8
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Sep 1 16:26:03 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Sep 1 16:26:03 2016 -0400
----------------------------------------------------------------------
.../store/memory/MemoryMessageStore.java | 53 +---
.../store/memory/MemoryPersistenceAdapter.java | 21 +-
.../store/memory/MemoryTopicMessageStore.java | 63 +++--
.../activemq/store/memory/MemoryTopicSub.java | 37 ++-
.../store/memory/MemoryTransactionStore.java | 36 ++-
.../cursors/MemoryPendingMessageCursorTest.java | 2 +-
.../org/apache/activemq/bugs/AMQ6387Test.java | 268 +++++++++++++++++++
7 files changed, 374 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 736d912..8e72c48 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,26 +18,21 @@ package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageStoreStatistics;
/**
- * An implementation of {@link org.apache.activemq.store.MessageStore} which
- * uses a
- *
- *
+ * An implementation of {@link org.apache.activemq.store.MessageStore}
*/
public class MemoryMessageStore extends AbstractMessageStore {
@@ -67,23 +62,11 @@ public class MemoryMessageStore extends AbstractMessageStore {
}
}
- // public void addMessageReference(ConnectionContext context,MessageId
- // messageId,long expirationTime,String messageRef)
- // throws IOException{
- // synchronized(messageTable){
- // messageTable.put(messageId,messageRef);
- // }
- // }
-
@Override
public Message getMessage(MessageId identity) throws IOException {
return messageTable.get(identity);
}
- // public String getMessageReference(MessageId identity) throws IOException{
- // return (String)messageTable.get(identity);
- // }
-
@Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
removeMessage(ack.getLastMessageId());
@@ -92,7 +75,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
public void removeMessage(MessageId msgId) throws IOException {
synchronized (messageTable) {
Message removed = messageTable.remove(msgId);
- if( removed !=null ) {
+ if (removed != null) {
removed.decrementReferenceCount();
decMessageStoreStatistics(getMessageStoreStatistics(), removed);
}
@@ -104,12 +87,10 @@ public class MemoryMessageStore extends AbstractMessageStore {
@Override
public void recover(MessageRecoveryListener listener) throws Exception {
- // the message table is a synchronizedMap - so just have to synchronize
- // here
+ // the message table is a synchronizedMap - so just have to synchronize here
synchronized (messageTable) {
- for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
- Message msg = iter.next();
- listener.recoverMessage(msg);
+ for (Message message : messageTable.values()) {
+ listener.recoverMessage(message);
}
}
}
@@ -133,17 +114,14 @@ public class MemoryMessageStore extends AbstractMessageStore {
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
- int count = 0;
- for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Entry)iter.next();
+ for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
if (pastLackBatch) {
- count++;
Object msg = entry.getValue();
- lastBatchId = (MessageId)entry.getKey();
+ lastBatchId = entry.getKey();
if (msg.getClass() == MessageId.class) {
- listener.recoverMessageReference((MessageId)msg);
+ listener.recoverMessageReference((MessageId) msg);
} else {
- listener.recoverMessage((Message)msg);
+ listener.recoverMessage((Message) msg);
}
} else {
pastLackBatch = entry.getKey().equals(lastBatchId);
@@ -167,7 +145,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
synchronized (messageTable) {
Message original = messageTable.get(message.getMessageId());
- //if can't be found then increment count, else remove old size
+ // if can't be found then increment count, else remove old size
if (original == null) {
getMessageStoreStatistics().getMessageCount().increment();
} else {
@@ -183,10 +161,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
synchronized (messageTable) {
long size = 0;
int count = 0;
- for (Iterator<Message> iter = messageTable.values().iterator(); iter
- .hasNext();) {
- Message msg = iter.next();
- size += msg.getSize();
+ for (Message message : messageTable.values()) {
+ size += message.getSize();
}
getMessageStoreStatistics().reset();
@@ -208,5 +184,4 @@ public class MemoryMessageStore extends AbstractMessageStore {
stats.getMessageSize().addSize(-message.getSize());
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 5655a48..c16ea14 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
- *
*/
public class MemoryPersistenceAdapter implements PersistenceAdapter {
private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -96,7 +95,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
/**
* Cleanup method to remove any state associated with the given destination
*
- * @param destination Destination to forget
+ * @param destination
+ * Destination to forget
*/
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
@@ -106,7 +106,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
/**
* Cleanup method to remove any state associated with the given destination
*
- * @param destination Destination to forget
+ * @param destination
+ * Destination to forget
*/
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
@@ -176,10 +177,10 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
protected MemoryMessageStore asMemoryMessageStore(Object value) {
if (value instanceof MemoryMessageStore) {
- return (MemoryMessageStore)value;
+ return (MemoryMessageStore) value;
}
if (value instanceof ProxyMessageStore) {
- MessageStore delegate = ((ProxyMessageStore)value).getDelegate();
+ MessageStore delegate = ((ProxyMessageStore) value).getDelegate();
if (delegate instanceof MemoryMessageStore) {
return (MemoryMessageStore) delegate;
}
@@ -189,8 +190,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
}
/**
- * @param usageManager The UsageManager that is controlling the broker's
- * memory usage.
+ * @param usageManager
+ * The UsageManager that is controlling the broker's memory usage.
*/
@Override
public void setUsageManager(SystemUsage usageManager) {
@@ -210,7 +211,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
}
@Override
- public File getDirectory(){
+ public File getDirectory() {
return null;
}
@@ -219,7 +220,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
}
@Override
- public long size(){
+ public long size() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 308ca59..dd8be2b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,9 +19,8 @@ package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -36,9 +35,6 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.SubscriptionKey;
-/**
- *
- */
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
@@ -48,18 +44,20 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
public MemoryTopicMessageStore(ActiveMQDestination destination) {
this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
- //Set the messageStoreStatistics after the super class is initialized so that the stats can be
- //properly updated on cache eviction
+ // Set the messageStoreStatistics after the super class is initialized
+ // so that the stats can be properly updated on cache eviction
MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable;
cache.setMessageStoreStatistics(messageStoreStatistics);
}
- public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
+ public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable,
+ Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
super(destination, messageTable);
this.subscriberDatabase = subscriberDatabase;
this.topicSubMap = makeSubMap();
- //this is only necessary so that messageStoreStatistics can be set if necessary
- //We need the original reference since messageTable is wrapped in a synchronized map in the parent class
+ // this is only necessary so that messageStoreStatistics can be set if
+ // necessary We need the original reference since messageTable is wrapped
+ // in a synchronized map in the parent class
this.originalMessageTable = messageTable;
}
@@ -74,15 +72,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
@Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
- for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
- MemoryTopicSub sub = i.next();
+ for (MemoryTopicSub sub : topicSubMap.values()) {
sub.addMessage(message.getMessageId(), message);
}
}
@Override
- public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId, MessageAck ack) throws IOException {
+ public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
+ super.removeMessage(messageId);
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
MemoryTopicSub sub = topicSubMap.get(key);
if (sub != null) {
@@ -98,12 +95,11 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
@Override
public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
SubscriptionKey key = new SubscriptionKey(info);
- MemoryTopicSub sub = new MemoryTopicSub();
+ MemoryTopicSub sub = new MemoryTopicSub(key);
topicSubMap.put(key, sub);
if (retroactive) {
- for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
- Map.Entry entry = (Entry)i.next();
- sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
+ for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
+ sub.addMessage(entry.getKey(), entry.getValue());
}
}
subscriberDatabase.put(key, info);
@@ -111,7 +107,19 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
@Override
public synchronized void deleteSubscription(String clientId, String subscriptionName) {
- org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+ SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+ subscriberDatabase.remove(key);
+ MemoryTopicSub subscription = topicSubMap.get(key);
+ if (subscription != null) {
+ List<Message> storedMessages = subscription.getStoredMessages();
+ for (Message message : storedMessages) {
+ try {
+ acknowledge(null, key.getClientId(), key.getSubscriptionName(), message.getMessageId(), null);
+ } catch (IOException e) {
+ }
+ }
+ }
+
subscriberDatabase.remove(key);
topicSubMap.remove(key);
}
@@ -172,7 +180,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
}
- //Disabled for the memory store, can be enabled later if necessary
+ // Disabled for the memory store, can be enabled later if necessary
private final MessageStoreSubscriptionStatistics stats = new MessageStoreSubscriptionStatistics(false);
@Override
@@ -181,27 +189,28 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
}
/**
- * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions
- * when computing the message store statistics.
+ * Since we initialize the store with a LRUCache in some cases, we need to
+ * account for cache evictions when computing the message store statistics.
*
*/
private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> {
private static final long serialVersionUID = -342098639681884413L;
private MessageStoreStatistics messageStoreStatistics;
- public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize,
- float loadFactor, boolean accessOrder) {
+ public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder) {
super(initialCapacity, maximumCacheSize, loadFactor, accessOrder);
}
- public void setMessageStoreStatistics(
- MessageStoreStatistics messageStoreStatistics) {
+ public void setMessageStoreStatistics(MessageStoreStatistics messageStoreStatistics) {
this.messageStoreStatistics = messageStoreStatistics;
}
@Override
protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) {
decMessageStoreStatistics(messageStoreStatistics, eldest.getValue());
+
+ // We aren't tracking this anymore so remove our reference to it.
+ eldest.getValue().decrementReferenceCount();
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
index 3adf7b8..67f7546 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,26 +16,34 @@
*/
package org.apache.activemq.store.memory;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.util.SubscriptionKey;
/**
* A holder for a durable subscriber
- *
- *
*/
class MemoryTopicSub {
- private Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
+ private final Map<MessageId, Message> map = new LinkedHashMap<MessageId, Message>();
+ private final SubscriptionKey subscriptionKey;
+
private MessageId lastBatch;
+ public MemoryTopicSub(SubscriptionKey subscriptionKey) {
+ this.subscriptionKey = subscriptionKey;
+ }
+
void addMessage(MessageId id, Message message) {
- synchronized(this) {
+ synchronized (this) {
map.put(id, message);
}
message.incrementReferenceCount();
@@ -43,17 +51,23 @@ class MemoryTopicSub {
void removeMessage(MessageId id) {
Message removed;
- synchronized(this) {
+ synchronized (this) {
removed = map.remove(id);
if ((lastBatch != null && lastBatch.equals(id)) || map.isEmpty()) {
resetBatching();
}
}
- if( removed!=null ) {
+ if (removed != null) {
removed.decrementReferenceCount();
}
}
+ List<Message> getStoredMessages() {
+ synchronized (this) {
+ return new ArrayList<Message>(map.values());
+ }
+ }
+
synchronized int size() {
return map.size();
}
@@ -80,8 +94,7 @@ class MemoryTopicSub {
synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
boolean pastLackBatch = lastBatch == null;
MessageId lastId = null;
- // the message table is a synchronizedMap - so just have to synchronize
- // here
+ // the message table is a synchronizedMap - so just have to synchronize here
int count = 0;
for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
Entry<MessageId, Message> entry = iter.next();
@@ -94,13 +107,17 @@ class MemoryTopicSub {
pastLackBatch = entry.getKey().equals(lastBatch);
}
}
+
if (lastId != null) {
lastBatch = lastId;
}
-
}
synchronized void resetBatching() {
lastBatch = null;
}
+
+ SubscriptionKey getSubscriptionKey() {
+ return subscriptionKey;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index e540bdc..30cd5b5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -44,8 +44,6 @@ import org.apache.activemq.store.TransactionStore;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
- *
- *
*/
public class MemoryTransactionStore implements TransactionStore {
@@ -56,6 +54,7 @@ public class MemoryTransactionStore implements TransactionStore {
private boolean doingRecover;
public class Tx {
+
public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
@@ -107,7 +106,7 @@ public class MemoryTransactionStore implements TransactionStore {
cmd.run(ctx);
}
- } catch ( IOException e ) {
+ } catch (IOException e) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
}
@@ -134,7 +133,7 @@ public class MemoryTransactionStore implements TransactionStore {
}
public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
- this.persistenceAdapter=persistenceAdapter;
+ this.persistenceAdapter = persistenceAdapter;
}
public MessageStore proxy(MessageStore messageStore) {
@@ -153,13 +152,13 @@ public class MemoryTransactionStore implements TransactionStore {
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
return new InlineListenableFuture();
- }
+ }
@Override
public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
return new InlineListenableFuture();
- }
+ }
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
@@ -194,13 +193,13 @@ public class MemoryTransactionStore implements TransactionStore {
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
return new InlineListenableFuture();
- }
+ }
@Override
public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
return new InlineListenableFuture();
- }
+ }
@Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
@@ -213,10 +212,9 @@ public class MemoryTransactionStore implements TransactionStore {
}
@Override
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId, MessageAck ack) throws IOException {
- MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
- subscriptionName, messageId, ack);
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack)
+ throws IOException {
+ MemoryTransactionStore.this.acknowledge((TopicMessageStore) getDelegate(), clientId, subscriptionName, messageId, ack);
}
};
onProxyTopicStore(proxyTopicMessageStore);
@@ -257,7 +255,7 @@ public class MemoryTransactionStore implements TransactionStore {
}
@Override
- public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
}
@@ -302,7 +300,7 @@ public class MemoryTransactionStore implements TransactionStore {
for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
Object txid = iter.next();
Tx tx = preparedTransactions.get(txid);
- listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
+ listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
onRecovered(tx);
}
} finally {
@@ -326,7 +324,9 @@ public class MemoryTransactionStore implements TransactionStore {
if (message.getTransactionId() != null) {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
+ @SuppressWarnings("unused")
MessageStore messageStore = destination;
+
@Override
public Message getMessage() {
return message;
@@ -385,8 +385,8 @@ public class MemoryTransactionStore implements TransactionStore {
}
}
- public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
- final MessageId messageId, final MessageAck ack) throws IOException {
+ public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, final MessageId messageId,
+ final MessageAck ack) throws IOException {
if (doingRecover) {
return;
}
@@ -414,11 +414,9 @@ public class MemoryTransactionStore implements TransactionStore {
}
}
-
public void delete() {
inflightTransactions.clear();
preparedTransactions.clear();
doingRecover = false;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
index 985fb94..8bd7b39 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java
@@ -123,7 +123,7 @@ public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursor
//The expected value is only 100 because for durables a LRUCache is being used
//with a max size of 100
- verifyStoreStats(dest, 100, publishedMessageSize.get());
+ verifyStoreStats(dest, 0, publishedMessageSize.get());
connection.stop();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bb8d32c0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
new file mode 100644
index 0000000..0db9561
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6387Test.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.usage.MemoryUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ6387Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ6387Test.class);
+
+ private final String QUEUE_NAME = "testQueue";
+ private final String TOPIC_NAME = "testTopic";
+ private final String SUBSCRIPTION_NAME = "subscriberId";
+ private final String CLIENT_ID = "client1";
+ private final int MSG_COUNT = 150;
+
+ private ActiveMQConnectionFactory connectionFactory;
+ private BrokerService brokerService;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void setUp() throws Exception {
+
+ LOG.info("=============== Starting test: {} ====================", testName.getMethodName());
+
+ brokerService = new BrokerService();
+ brokerService.setAdvisorySupport(false);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+ brokerService.setKeepDurableSubsActive(false);
+ brokerService.start();
+ connectionFactory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ brokerService.stop();
+
+ LOG.info("=============== Finished test: {} ====================", testName.getMethodName());
+ }
+
+ @Test
+ public void testQueueMessagesKeptAfterDelivery() throws Exception {
+ createDurableSubscription();
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ sendBytesMessage(Queue.class);
+
+ logBrokerMemoryUsage(Queue.class);
+
+ assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+ receiveMessages(Queue.class);
+
+ assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+ logBrokerMemoryUsage(Queue.class);
+
+ assertEquals(0, getCurrentMemoryUsage(Queue.class));
+ }
+
+ @Test
+ public void testQueueMessagesKeptAfterPurge() throws Exception {
+ createDurableSubscription();
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ sendBytesMessage(Queue.class);
+
+ logBrokerMemoryUsage(Queue.class);
+
+ assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+ getProxyToQueue(QUEUE_NAME).purge();
+
+ assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length);
+
+ logBrokerMemoryUsage(Queue.class);
+
+ assertEquals(0, getCurrentMemoryUsage(Queue.class));
+ }
+
+ @Test
+ public void testDurableTopicSubscriptionMessagesKeptAfterDelivery() throws Exception {
+ createDurableSubscription();
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ sendBytesMessage(Topic.class);
+
+ logBrokerMemoryUsage(Topic.class);
+
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ receiveMessages(Topic.class);
+
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ logBrokerMemoryUsage(Topic.class);
+
+ assertEquals(0, getCurrentMemoryUsage(Topic.class));
+ }
+
+ @Test
+ public void testDurableTopicSubscriptionMessagesKeptAfterUnsubscribe() throws Exception {
+ createDurableSubscription();
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ sendBytesMessage(Topic.class);
+
+ logBrokerMemoryUsage(Topic.class);
+
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ unsubscribeDurableSubscription();
+
+ assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
+ assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+ logBrokerMemoryUsage(Topic.class);
+
+ assertEquals(0, getCurrentMemoryUsage(Topic.class));
+ }
+
+ private void createDurableSubscription() throws JMSException {
+ final Connection connection = connectionFactory.createConnection();
+ connection.setClientID(CLIENT_ID);
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(TOPIC_NAME);
+ connection.start();
+
+ session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, null, false);
+ LOG.info("Created durable subscription.");
+
+ connection.stop();
+ connection.close();
+ }
+
+ private void receiveMessages(Class<? extends Destination> destType) throws JMSException {
+ final Connection connection = connectionFactory.createConnection();
+ connection.setClientID(CLIENT_ID);
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Destination destination;
+ if (destType.equals(Queue.class)) {
+ destination = session.createQueue(QUEUE_NAME);
+ } else {
+ destination = session.createTopic(TOPIC_NAME);
+ }
+
+ final MessageConsumer consumer;
+ if (destType.equals(Queue.class)) {
+ consumer = session.createConsumer(destination);
+ } else {
+ consumer = session.createDurableSubscriber((Topic) destination, SUBSCRIPTION_NAME, null, false);
+ }
+
+ connection.start();
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ assertNotNull(consumer.receive(5000));
+ }
+
+ connection.close();
+ }
+
+ private void sendBytesMessage(Class<? extends Destination> destType) throws JMSException {
+ final Connection connection = connectionFactory.createConnection();
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Destination destination;
+ if (destType.equals(Queue.class)) {
+ destination = session.createQueue(QUEUE_NAME);
+ } else {
+ destination = session.createTopic(TOPIC_NAME);
+ }
+ final MessageProducer producer = session.createProducer(destination);
+ final BytesMessage bytesMessage = session.createBytesMessage();
+
+ bytesMessage.writeBytes(new byte[1024 * 1024]);
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ producer.send(bytesMessage);
+ }
+
+ connection.close();
+ }
+
+ private void unsubscribeDurableSubscription() throws JMSException {
+ final Connection connection = connectionFactory.createConnection();
+ connection.setClientID(CLIENT_ID);
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe(SUBSCRIPTION_NAME);
+ LOG.info("Unsubscribed durable subscription.");
+
+ connection.stop();
+ connection.close();
+ }
+
+ private long getCurrentMemoryUsage(Class<? extends Destination> destType) throws Exception {
+ final MemoryUsage usage;
+ if (destType.equals(Queue.class)) {
+ usage = brokerService.getDestination(ActiveMQDestination.createDestination(QUEUE_NAME, ActiveMQDestination.QUEUE_TYPE)).getMemoryUsage();
+ } else {
+ usage = brokerService.getDestination(ActiveMQDestination.createDestination(TOPIC_NAME, ActiveMQDestination.TOPIC_TYPE)).getMemoryUsage();
+ }
+
+ return usage.getUsage();
+ }
+
+ private void logBrokerMemoryUsage(Class<? extends Destination> destType) throws Exception {
+ LOG.info("Memory usage: broker={}% destination={}", brokerService.getAdminView().getMemoryPercentUsage(), getCurrentMemoryUsage(destType));
+ }
+
+ protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+ QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+ return proxy;
+ }
+}