You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/12/03 18:44:40 UTC
svn commit: r722983 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/kahadaptor/
test/java/org/apache/activemq/advisory/ test/j...
Author: gtully
Date: Wed Dec 3 09:44:39 2008
New Revision: 722983
URL: http://svn.apache.org/viewvc?rev=722983&view=rev
Log:
resolve AMQ-2020, we may want to push setBatch into the MessageStore inteface, see the use by the cursors when the cache is exhausted
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Dec 3 09:44:39 2008
@@ -863,6 +863,9 @@
QueueMessageReference r = createMessageReference(m);
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
+ synchronized (messages) {
+ messages.rollback(r.getMessageId());
+ }
return true;
}
@@ -909,18 +912,12 @@
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
- Message m = r.getMessage();
- BrokerSupport.resend(context, m, dest);
- removeMessage(context, r);
+ moveMessageTo(context, ref.getMessage(), dest);
set.remove(r);
if (++movedCounter >= maximumMessages
&& maximumMessages > 0) {
return movedCounter;
}
- } else {
- synchronized (messages) {
- messages.rollback(r.getMessageId());
- }
}
}
} while (set.size() < this.destinationStatistics.getMessages().getCount()
@@ -1088,6 +1085,12 @@
});
}
}
+ if (ack.isPoisonAck()) {
+ // message gone to DLQ, is ok to allow redelivery
+ synchronized(messages) {
+ messages.rollback(reference.getMessageId());
+ }
+ }
}
@@ -1097,9 +1100,6 @@
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
- synchronized(messages) {
- messages.rollback(reference.getMessageId());
- }
}
public void messageExpired(ConnectionContext context,MessageReference reference) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Dec 3 09:44:39 2008
@@ -43,6 +43,7 @@
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
protected int size;
+ private MessageId lastCachedId;
protected AbstractStoreCursor(Destination destination) {
this.regionDestination=destination;
@@ -154,12 +155,20 @@
public final synchronized void addMessageLast(MessageReference node) throws Exception {
if (cacheEnabled && hasSpace()) {
recoverMessage(node.getMessage(),true);
- }else {
+ lastCachedId = node.getMessageId();
+ } else {
+ if (cacheEnabled) {
+ // sync with store on disabling the cache
+ setBatch(lastCachedId);
+ }
cacheEnabled=false;
}
size++;
}
+ protected void setBatch(MessageId messageId) {
+ }
+
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
cacheEnabled=false;
size++;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Wed Dec 3 09:44:39 2008
@@ -17,11 +17,14 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
+import java.io.InterruptedIOException;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.amq.AMQMessageStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -71,6 +74,20 @@
this.store.resetBatching();
}
+ protected void setBatch(MessageId messageId) {
+ AMQMessageStore amqStore = (AMQMessageStore) store;
+ try {
+ amqStore.flush();
+ } catch (InterruptedIOException e) {
+ LOG.debug("flush on setBatch resulted in exception", e);
+ }
+ KahaReferenceStore kahaStore =
+ (KahaReferenceStore) amqStore.getReferenceStore();
+ kahaStore.setBatch(messageId);
+ batchResetNeeded = false;
+ }
+
+
protected void doFillBatch() throws Exception {
this.store.recoverNextMessages(this.maxBatchSize, this);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Dec 3 09:44:39 2008
@@ -17,9 +17,7 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
-import java.util.LinkedHashMap;
-import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@@ -39,7 +37,6 @@
class TopicStorePrefetch extends AbstractStoreCursor {
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
private TopicMessageStore store;
- private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
private String clientId;
private String subscriberName;
private Subscription subscription;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Dec 3 09:44:39 2008
@@ -193,7 +193,7 @@
public void removeAllMessages(ConnectionContext context) throws IOException {
lock.lock();
try {
- Set<MessageId> tmpSet = new HashSet(messageContainer.keySet());
+ Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
for (MessageId id:tmpSet) {
removeMessage(id);
}
@@ -255,5 +255,11 @@
* @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
*/
public void setBatch(MessageId startAfter) {
+ lock.lock();
+ try {
+ batchEntry = messageContainer.getEntry(startAfter);
+ } finally {
+ lock.unlock();
+ }
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Wed Dec 3 09:44:39 2008
@@ -29,6 +29,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
@@ -89,15 +90,13 @@
@Override
public void testLoadRequestReply() throws Exception {
super.testLoadRequestReply();
+
+ Thread.sleep(2000);
// some checks on the slave
AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
AdvisoryBroker.class);
- if (!deleteTempQueue || serverTransactional) {
- // give temp destination removes a chance to perculate on connection.close
- Thread.sleep(2000);
- }
assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=722983&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed Dec 3 09:44:39 2008
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.InvalidSelectorException;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class QueueDuplicatesFromStoreTest extends TestCase {
+ private static final Log LOG = LogFactory
+ .getLog(QueueDuplicatesFromStoreTest.class);
+
+ ActiveMQQueue destination = new ActiveMQQueue("queue-"
+ + QueueDuplicatesFromStoreTest.class.getSimpleName());
+ BrokerService brokerService;
+
+ final static String mesageIdRoot = "11111:22222:";
+ final int messageBytesSize = 256;
+ final String text = new String(new byte[messageBytesSize]);
+
+ final int ackStartIndex = 100;
+ final int ackWindow = 50;
+ final int ackBatchSize = 50;
+ final int fullWindow = 200;
+ final int count = 20000;
+
+ public void setUp() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setUseJmx(false);
+ brokerService.deleteAllMessages();
+ brokerService.start();
+ }
+
+ public void tearDown() throws Exception {
+ brokerService.stop();
+ }
+
+ public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception {
+ doTestNoDuplicateAfterCacheFullAndAcked(1024*10);
+ }
+
+ public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception {
+ doTestNoDuplicateAfterCacheFullAndAcked(512);
+ }
+
+ public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
+ final AMQPersistenceAdapter persistenceAdapter =
+ (AMQPersistenceAdapter) brokerService.getPersistenceAdapter();
+ final MessageStore queueMessageStore =
+ persistenceAdapter.createQueueMessageStore(destination);
+ final ConnectionContext contextNotInTx = new ConnectionContext();
+ final ConsumerInfo consumerInfo = new ConsumerInfo();
+ final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ consumerInfo.setExclusive(true);
+ final Queue queue = new Queue(brokerService, destination,
+ queueMessageStore, destinationStatistics, null);
+
+ // a workaround for this issue
+ // queue.setUseCache(false);
+ queue.systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 10);
+ queue.setMaxAuditDepth(auditDepth);
+ queue.initialize();
+ queue.start();
+
+
+ ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+ ProducerInfo producerInfo = new ProducerInfo();
+ ProducerState producerState = new ProducerState(producerInfo);
+ producerExchange.setProducerState(producerState);
+ producerExchange.setConnectionContext(contextNotInTx);
+
+ final CountDownLatch receivedLatch = new CountDownLatch(count);
+ final AtomicLong ackedCount = new AtomicLong(0);
+ final AtomicLong enqueueCounter = new AtomicLong(0);
+ final Vector<String> errors = new Vector<String>();
+
+ // populate the queue store, exceed memory limit so that cache is disabled
+ for (int i = 0; i < count; i++) {
+ Message message = getMessage(i);
+ queue.send(producerExchange, message);
+ }
+
+ assertEquals("store count is correct", count, queueMessageStore
+ .getMessageCount());
+
+ // pull from store in small windows
+ Subscription subscription = new Subscription() {
+
+ public void add(MessageReference node) throws Exception {
+ if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
+ errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: "
+ + node.getMessageId().getProducerSequenceId());
+ }
+ assertEquals("is in order", enqueueCounter.get(), node
+ .getMessageId().getProducerSequenceId());
+ receivedLatch.countDown();
+ enqueueCounter.incrementAndGet();
+ node.decrementReferenceCount();
+ }
+
+ public void add(ConnectionContext context, Destination destination)
+ throws Exception {
+ }
+
+ public int countBeforeFull() {
+ if (isFull()) {
+ return 0;
+ } else {
+ return fullWindow - (int) (enqueueCounter.get() - ackedCount.get());
+ }
+ }
+
+ public void destroy() {
+ };
+
+ public void gc() {
+ }
+
+ public ConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+ public ConnectionContext getContext() {
+ return null;
+ }
+
+ public long getDequeueCounter() {
+ return 0;
+ }
+
+ public long getDispatchedCounter() {
+ return 0;
+ }
+
+ public int getDispatchedQueueSize() {
+ return 0;
+ }
+
+ public long getEnqueueCounter() {
+ return 0;
+ }
+
+ public int getInFlightSize() {
+ return 0;
+ }
+
+ public int getInFlightUsage() {
+ return 0;
+ }
+
+ public ObjectName getObjectName() {
+ return null;
+ }
+
+ public int getPendingQueueSize() {
+ return 0;
+ }
+
+ public int getPrefetchSize() {
+ return 0;
+ }
+
+ public String getSelector() {
+ return null;
+ }
+
+ public boolean isBrowser() {
+ return false;
+ }
+
+ public boolean isFull() {
+ return (enqueueCounter.get() - ackedCount.get()) >= fullWindow;
+ }
+
+ public boolean isHighWaterMark() {
+ return false;
+ }
+
+ public boolean isLowWaterMark() {
+ return false;
+ }
+
+ public boolean isRecoveryRequired() {
+ return false;
+ }
+
+ public boolean isSlave() {
+ return false;
+ }
+
+ public boolean matches(MessageReference node,
+ MessageEvaluationContext context) throws IOException {
+ return true;
+ }
+
+ public boolean matches(ActiveMQDestination destination) {
+ return true;
+ }
+
+ public void processMessageDispatchNotification(
+ MessageDispatchNotification mdn) throws Exception {
+ }
+
+ public Response pullMessage(ConnectionContext context,
+ MessagePull pull) throws Exception {
+ return null;
+ }
+
+ public List<MessageReference> remove(ConnectionContext context,
+ Destination destination) throws Exception {
+ return null;
+ }
+
+ public void setObjectName(ObjectName objectName) {
+ }
+
+ public void setSelector(String selector)
+ throws InvalidSelectorException,
+ UnsupportedOperationException {
+ }
+
+ public void updateConsumerPrefetch(int newPrefetch) {
+ }
+
+ public boolean addRecoveredMessage(ConnectionContext context,
+ MessageReference message) throws Exception {
+ return false;
+ }
+
+ public ActiveMQDestination getActiveMQDestination() {
+ return destination;
+ }
+
+ public void acknowledge(ConnectionContext context, MessageAck ack)
+ throws Exception {
+ }
+ };
+
+ queue.addSubscription(contextNotInTx, subscription);
+ int removeIndex = 0;
+ do {
+ // Simulate periodic acks in small but recent windows
+ long receivedCount = enqueueCounter.get();
+ if (receivedCount > ackStartIndex) {
+ if (receivedCount >= removeIndex + ackWindow) {
+ for (int j = 0; j < ackBatchSize; j++, removeIndex++) {
+ ackedCount.incrementAndGet();
+ MessageAck ack = new MessageAck();
+ ack.setLastMessageId(new MessageId(mesageIdRoot
+ + removeIndex));
+ ack.setMessageCount(1);
+ queue.removeMessage(contextNotInTx, subscription,
+ new IndirectMessageReference(
+ getMessage(removeIndex)), ack);
+
+ }
+ if (removeIndex % 1000 == 0) {
+ LOG.info("acked: " + removeIndex);
+ persistenceAdapter.checkpoint(true);
+ persistenceAdapter.cleanup();
+ }
+ }
+ }
+
+ } while (!receivedLatch.await(0, TimeUnit.MILLISECONDS) && errors.isEmpty());
+
+ assertTrue("There are no errors: " + errors, errors.isEmpty());
+ assertEquals(count, enqueueCounter.get());
+ assertEquals("store count is correct", count - removeIndex,
+ queueMessageStore.getMessageCount());
+ }
+
+ private Message getMessage(int i) throws Exception {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setMessageId(new MessageId(mesageIdRoot + i));
+ message.setDestination(destination);
+ message.setPersistent(true);
+ message.setResponseRequired(true);
+ message.setText("Msg:" + i + " " + text);
+ assertEquals(message.getMessageId().getProducerSequenceId(), i);
+ return message;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date