You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/12/03 18:44:40 UTC

svn commit: r722983 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/advisory/ test/j...

Author: gtully
Date: Wed Dec  3 09:44:39 2008
New Revision: 722983

URL: http://svn.apache.org/viewvc?rev=722983&view=rev
Log:
resolve AMQ-2020, we may want to push setBatch into the MessageStore inteface, see the use by the cursors when the cache is exhausted

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Dec  3 09:44:39 2008
@@ -863,6 +863,9 @@
         QueueMessageReference r = createMessageReference(m);
         BrokerSupport.resend(context, m, dest);
         removeMessage(context, r);
+        synchronized (messages) {
+            messages.rollback(r.getMessageId());
+        }
         return true;
     }
 
@@ -909,18 +912,12 @@
                 IndirectMessageReference r = (IndirectMessageReference) ref;
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
-                    Message m = r.getMessage();
-                    BrokerSupport.resend(context, m, dest);
-                    removeMessage(context, r);
+                    moveMessageTo(context, ref.getMessage(), dest);
                     set.remove(r);
                     if (++movedCounter >= maximumMessages
                             && maximumMessages > 0) {
                         return movedCounter;
                     }
-                } else {
-                    synchronized (messages) {
-                        messages.rollback(r.getMessageId());
-                    }
                 }
             }
         } while (set.size() < this.destinationStatistics.getMessages().getCount()
@@ -1088,6 +1085,12 @@
                 });
             }
         }
+        if (ack.isPoisonAck()) {
+            // message gone to DLQ, is ok to allow redelivery
+            synchronized(messages) {
+                messages.rollback(reference.getMessageId());
+            }
+        }
 
     }
     
@@ -1097,9 +1100,6 @@
         synchronized(pagedInMessages) {
             pagedInMessages.remove(reference.getMessageId());
         }
-        synchronized(messages) {
-            messages.rollback(reference.getMessageId());
-        }
     }
     
     public void messageExpired(ConnectionContext context,MessageReference reference) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Dec  3 09:44:39 2008
@@ -43,6 +43,7 @@
     protected boolean batchResetNeeded = true;
     protected boolean storeHasMessages = false;
     protected int size;
+    private MessageId lastCachedId;
     
     protected AbstractStoreCursor(Destination destination) {
         this.regionDestination=destination;
@@ -154,12 +155,20 @@
     public final synchronized void addMessageLast(MessageReference node) throws Exception {
         if (cacheEnabled && hasSpace()) {
             recoverMessage(node.getMessage(),true);
-        }else {
+            lastCachedId = node.getMessageId();
+        } else {
+            if (cacheEnabled) {
+                // sync with store on disabling the cache
+                setBatch(lastCachedId);
+            }
             cacheEnabled=false;
         }
         size++;
     }
 
+    protected void setBatch(MessageId messageId) {
+    }
+
     public final synchronized void addMessageFirst(MessageReference node) throws Exception {
         cacheEnabled=false;
         size++;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Wed Dec  3 09:44:39 2008
@@ -17,11 +17,14 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.amq.AMQMessageStore;
+import org.apache.activemq.store.kahadaptor.KahaReferenceStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -71,6 +74,20 @@
         this.store.resetBatching();
     }
     
+    protected void setBatch(MessageId messageId) {
+        AMQMessageStore amqStore = (AMQMessageStore) store;
+        try {
+            amqStore.flush();
+        } catch (InterruptedIOException e) {
+            LOG.debug("flush on setBatch resulted in exception", e);        
+        }
+        KahaReferenceStore kahaStore = 
+            (KahaReferenceStore) amqStore.getReferenceStore();
+        kahaStore.setBatch(messageId);
+        batchResetNeeded = false;
+    }
+
+    
     protected void doFillBatch() throws Exception {
         this.store.recoverNextMessages(this.maxBatchSize, this);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Dec  3 09:44:39 2008
@@ -17,9 +17,7 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
-import java.util.LinkedHashMap;
 
-import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
@@ -39,7 +37,6 @@
 class TopicStorePrefetch extends AbstractStoreCursor {
     private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
     private TopicMessageStore store;
-    private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
     private String clientId;
     private String subscriberName;
     private Subscription subscription;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Dec  3 09:44:39 2008
@@ -193,7 +193,7 @@
     public void removeAllMessages(ConnectionContext context) throws IOException {
         lock.lock();
         try {
-            Set<MessageId> tmpSet = new HashSet(messageContainer.keySet());
+            Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
             for (MessageId id:tmpSet) {
                 removeMessage(id);
             }
@@ -255,5 +255,11 @@
      * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
      */
     public void setBatch(MessageId startAfter) {
+        lock.lock();
+        try {
+            batchEntry = messageContainer.getEntry(startAfter);
+        } finally {
+            lock.unlock();
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=722983&r1=722982&r2=722983&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Wed Dec  3 09:44:39 2008
@@ -29,6 +29,7 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
 
 
 public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
@@ -89,15 +90,13 @@
     @Override
     public void testLoadRequestReply() throws Exception {
         super.testLoadRequestReply();
+
+        Thread.sleep(2000);
         
         // some checks on the slave
         AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
                 AdvisoryBroker.class);
         
-        if (!deleteTempQueue || serverTransactional) {
-            // give temp destination removes a chance to perculate on connection.close
-            Thread.sleep(2000);
-        } 
         assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
                        
         RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=722983&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed Dec  3 09:44:39 2008
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.InvalidSelectorException;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class QueueDuplicatesFromStoreTest extends TestCase {
+    private static final Log LOG = LogFactory
+            .getLog(QueueDuplicatesFromStoreTest.class);
+
+    ActiveMQQueue destination = new ActiveMQQueue("queue-"
+            + QueueDuplicatesFromStoreTest.class.getSimpleName());
+    BrokerService brokerService;
+
+    final static String mesageIdRoot = "11111:22222:";
+    final int messageBytesSize = 256;
+    final String text = new String(new byte[messageBytesSize]);
+
+    final int ackStartIndex = 100;
+    final int ackWindow = 50;
+    final int ackBatchSize = 50;
+    final int fullWindow = 200;
+    final int count = 20000;
+
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(false);
+        brokerService.deleteAllMessages();
+        brokerService.start();        
+    }
+
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception {
+        doTestNoDuplicateAfterCacheFullAndAcked(1024*10);
+    }
+
+    public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception {
+        doTestNoDuplicateAfterCacheFullAndAcked(512);
+    }
+
+    public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
+        final AMQPersistenceAdapter persistenceAdapter = 
+            (AMQPersistenceAdapter) brokerService.getPersistenceAdapter();
+        final MessageStore queueMessageStore = 
+            persistenceAdapter.createQueueMessageStore(destination);
+        final ConnectionContext contextNotInTx = new ConnectionContext();
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new DestinationStatistics();
+        consumerInfo.setExclusive(true);
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        // a workaround for this issue
+        // queue.setUseCache(false);
+        queue.systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 10);
+        queue.setMaxAuditDepth(auditDepth);
+        queue.initialize();
+        queue.start();
+       
+
+        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+        ProducerInfo producerInfo = new ProducerInfo();
+        ProducerState producerState = new ProducerState(producerInfo);
+        producerExchange.setProducerState(producerState);
+        producerExchange.setConnectionContext(contextNotInTx);
+
+        final CountDownLatch receivedLatch = new CountDownLatch(count);
+        final AtomicLong ackedCount = new AtomicLong(0);
+        final AtomicLong enqueueCounter = new AtomicLong(0);
+        final Vector<String> errors = new Vector<String>();
+                
+        // populate the queue store, exceed memory limit so that cache is disabled
+        for (int i = 0; i < count; i++) {
+            Message message = getMessage(i);
+            queue.send(producerExchange, message);
+        }
+        
+        assertEquals("store count is correct", count, queueMessageStore
+                .getMessageCount());
+
+        // pull from store in small windows
+        Subscription subscription = new Subscription() {
+
+            public void add(MessageReference node) throws Exception {
+                if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
+                    errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: "
+                            + node.getMessageId().getProducerSequenceId());
+                }
+                assertEquals("is in order", enqueueCounter.get(), node
+                        .getMessageId().getProducerSequenceId());
+                receivedLatch.countDown();
+                enqueueCounter.incrementAndGet();
+                node.decrementReferenceCount();
+            }
+
+            public void add(ConnectionContext context, Destination destination)
+                    throws Exception {
+            }
+
+            public int countBeforeFull() {
+                if (isFull()) {
+                    return 0;
+                } else {
+                    return fullWindow - (int) (enqueueCounter.get() - ackedCount.get());
+                }
+            }
+
+            public void destroy() {
+            };
+
+            public void gc() {
+            }
+
+            public ConsumerInfo getConsumerInfo() {
+                return consumerInfo;
+            }
+
+            public ConnectionContext getContext() {
+                return null;
+            }
+
+            public long getDequeueCounter() {
+                return 0;
+            }
+
+            public long getDispatchedCounter() {
+                return 0;
+            }
+
+            public int getDispatchedQueueSize() {
+                return 0;
+            }
+
+            public long getEnqueueCounter() {
+                return 0;
+            }
+
+            public int getInFlightSize() {
+                return 0;
+            }
+
+            public int getInFlightUsage() {
+                return 0;
+            }
+
+            public ObjectName getObjectName() {
+                return null;
+            }
+
+            public int getPendingQueueSize() {
+                return 0;
+            }
+
+            public int getPrefetchSize() {
+                return 0;
+            }
+
+            public String getSelector() {
+                return null;
+            }
+
+            public boolean isBrowser() {
+                return false;
+            }
+
+            public boolean isFull() {
+                return (enqueueCounter.get() - ackedCount.get()) >= fullWindow;
+            }
+
+            public boolean isHighWaterMark() {
+                return false;
+            }
+
+            public boolean isLowWaterMark() {
+                return false;
+            }
+
+            public boolean isRecoveryRequired() {
+                return false;
+            }
+
+            public boolean isSlave() {
+                return false;
+            }
+
+            public boolean matches(MessageReference node,
+                    MessageEvaluationContext context) throws IOException {
+                return true;
+            }
+
+            public boolean matches(ActiveMQDestination destination) {
+                return true;
+            }
+
+            public void processMessageDispatchNotification(
+                    MessageDispatchNotification mdn) throws Exception {
+            }
+
+            public Response pullMessage(ConnectionContext context,
+                    MessagePull pull) throws Exception {
+                return null;
+            }
+
+            public List<MessageReference> remove(ConnectionContext context,
+                    Destination destination) throws Exception {
+                return null;
+            }
+
+            public void setObjectName(ObjectName objectName) {
+            }
+
+            public void setSelector(String selector)
+                    throws InvalidSelectorException,
+                    UnsupportedOperationException {
+            }
+
+            public void updateConsumerPrefetch(int newPrefetch) {
+            }
+
+            public boolean addRecoveredMessage(ConnectionContext context,
+                    MessageReference message) throws Exception {
+                return false;
+            }
+
+            public ActiveMQDestination getActiveMQDestination() {
+                return destination;
+            }
+
+            public void acknowledge(ConnectionContext context, MessageAck ack)
+                    throws Exception {
+            }
+        };
+
+        queue.addSubscription(contextNotInTx, subscription);
+        int removeIndex = 0;
+        do {
+            // Simulate periodic acks in small but recent windows
+            long receivedCount = enqueueCounter.get();
+            if (receivedCount > ackStartIndex) {
+                if (receivedCount >= removeIndex + ackWindow) {
+                    for (int j = 0; j < ackBatchSize; j++, removeIndex++) {
+                        ackedCount.incrementAndGet();
+                        MessageAck ack = new MessageAck();
+                        ack.setLastMessageId(new MessageId(mesageIdRoot
+                                + removeIndex));
+                        ack.setMessageCount(1);
+                        queue.removeMessage(contextNotInTx, subscription,
+                                new IndirectMessageReference(
+                                        getMessage(removeIndex)), ack);
+
+                    }
+                    if (removeIndex % 1000 == 0) {
+                        LOG.info("acked: " + removeIndex);
+                        persistenceAdapter.checkpoint(true);
+                        persistenceAdapter.cleanup();
+                    }
+                }
+            }
+
+        } while (!receivedLatch.await(0, TimeUnit.MILLISECONDS) && errors.isEmpty());
+
+        assertTrue("There are no errors: " + errors, errors.isEmpty());
+        assertEquals(count, enqueueCounter.get());
+        assertEquals("store count is correct", count - removeIndex,
+                queueMessageStore.getMessageCount());
+    }
+
+    private Message getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setMessageId(new MessageId(mesageIdRoot + i));
+        message.setDestination(destination);
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + text);
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date