You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/01/09 13:17:09 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4952 deal with jdbc store, cursor audit=false case, also relevant when cursor window is exceeded. Treating duplicate into pagedInMessages as an error, silently ignoring leads to trapped messages or de

Updated Branches:
  refs/heads/trunk efc51fa44 -> cc47ab6d0


https://issues.apache.org/jira/browse/AMQ-4952 deal with jdbc store, cursor audit=false case, also relevant when cursor window is exceeded. Treating duplicate into pagedInMessages as an error, silently ignoring leads to trapped messages or deferred dispatch after a restart, revert change from https://issues.apache.org/jira/browse/AMQ-1957


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc47ab6d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc47ab6d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc47ab6d

Branch: refs/heads/trunk
Commit: cc47ab6d002a0810f44b1cd8c40ed97c822b7e08
Parents: efc51fa
Author: gtully <ga...@gmail.com>
Authored: Thu Jan 9 12:16:08 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Jan 9 12:16:37 2014 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   9 +
 .../region/SubscriptionAddRemoveQueueTest.java  | 368 +++++++++++++++++++
 .../org/apache/activemq/bugs/AMQ4952Test.java   |  28 +-
 3 files changed, 399 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc47ab6d/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 18a4a69..10b463e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1964,6 +1964,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                         resultList.addMessageLast(ref);
                     } else {
                         ref.decrementReferenceCount();
+                        // store should have trapped duplicate in it's index, also cursor audit
+                        // we need to remove the duplicate from the store in the knowledge that the original message may be inflight
+                        // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
+                        LOG.warn("{}, duplicate message {} paged in, is cursor audit disabled? Removing from store and redirecting to dlq", this, ref.getMessage());
+                        if (store != null) {
+                            ConnectionContext connectionContext = createConnectionContext();
+                            store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
+                            broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from store for " + destination));
+                        }
                     }
                 }
             } finally {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc47ab6d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
new file mode 100644
index 0000000..2fa6fa7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.InvalidSelectorException;
+import javax.management.ObjectName;
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.TaskRunnerFactory;
+
+public class SubscriptionAddRemoveQueueTest extends TestCase {
+
+    Queue queue;
+
+    ConsumerInfo info = new ConsumerInfo();
+    List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>();
+    ConnectionContext context = new ConnectionContext();
+    ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
+    ProducerInfo producerInfo = new ProducerInfo();
+    ProducerState producerState = new ProducerState(producerInfo);
+    ActiveMQDestination destination = new ActiveMQQueue("TEST");
+    int numSubscriptions = 1000;
+    boolean working = true;
+    int senders = 20;
+
+
+    @Override
+    public void setUp() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.start();
+        DestinationStatistics parentStats = new DestinationStatistics();
+        parentStats.setEnabled(true);
+
+        TaskRunnerFactory taskFactory = new TaskRunnerFactory();
+        MessageStore store = null;
+
+        info.setDestination(destination);
+        info.setPrefetchSize(100);
+
+        producerBrokerExchange.setProducerState(producerState);
+        producerBrokerExchange.setConnectionContext(context);
+
+        queue = new Queue(brokerService, destination, store, parentStats, taskFactory);
+        queue.initialize();
+    }
+
+    public void testNoDispatchToRemovedConsumers() throws Exception {
+        final AtomicInteger producerId = new AtomicInteger();
+        Runnable sender = new Runnable() {
+            public void run() {
+                AtomicInteger id = new AtomicInteger();
+                int producerIdAndIncrement = producerId.getAndIncrement();
+                while (working) {
+                    try {
+                        Message msg = new ActiveMQMessage();
+                        msg.setDestination(destination);
+                        msg.setMessageId(new MessageId(producerIdAndIncrement + ":0:" + id.getAndIncrement()));
+                        queue.send(producerBrokerExchange, msg);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        fail("unexpected exception in sendMessage, ex:" + e);
+                    }
+                }
+            }
+        };
+
+        Runnable subRemover = new Runnable() {
+            public void run() {
+                for (Subscription sub : subs) {
+                    try {
+                        queue.removeSubscription(context, sub, 0);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        fail("unexpected exception in removeSubscription, ex:" + e);
+                    }
+                }
+            }
+        };
+
+        for (int i=0;i<numSubscriptions; i++) {
+            SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription();
+            subs.add(sub);
+            queue.addSubscription(context, sub);
+        }
+        assertEquals("there are X subscriptions", numSubscriptions, queue.getDestinationStatistics().getConsumers().getCount());
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i=0; i<senders ; i++) {
+            executor.submit(sender);
+        }
+
+        Thread.sleep(1000);
+        for (SimpleImmediateDispatchSubscription sub : subs) {
+            assertTrue("There are some locked messages in the subscription", hasSomeLocks(sub.dispatched));
+        }
+
+        Future<?> result = executor.submit(subRemover);
+        result.get();
+        working = false;
+        assertEquals("there are no subscriptions", 0, queue.getDestinationStatistics().getConsumers().getCount());
+
+        for (SimpleImmediateDispatchSubscription sub : subs) {
+            assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched));
+        }
+
+    }
+
+    private boolean hasSomeLocks(List<MessageReference> dispatched) {
+        boolean hasLock = false;
+        for (MessageReference mr: dispatched) {
+            QueueMessageReference qmr = (QueueMessageReference) mr;
+            if (qmr.getLockOwner() != null) {
+                hasLock = true;
+                break;
+            }
+        }
+        return hasLock;
+    }
+
+    public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
+
+        List<MessageReference> dispatched =
+            Collections.synchronizedList(new ArrayList<MessageReference>());
+
+        public void acknowledge(ConnectionContext context, MessageAck ack)
+                throws Exception {
+        }
+
+        public void add(MessageReference node) throws Exception {
+            // immediate dispatch
+            QueueMessageReference  qmr = (QueueMessageReference)node;
+            qmr.lock(this);
+            dispatched.add(qmr);
+        }
+
+        public ConnectionContext getContext() {
+            return null;
+        }
+
+        @Override
+        public int getCursorMemoryHighWaterMark() {
+            return 0;
+        }
+
+        @Override
+        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+        }
+
+        @Override
+        public boolean isSlowConsumer() {
+            return false;
+        }
+
+        @Override
+        public void unmatched(MessageReference node) throws IOException {
+        }
+
+        @Override
+        public long getTimeOfLastMessageAck() {
+            return 0;
+        }
+
+        @Override
+        public long getConsumedCount() {
+            return 0;
+        }
+
+        @Override
+        public void incrementConsumedCount() {
+        }
+
+        @Override
+        public void resetConsumedCount() {
+        }
+
+        public void add(ConnectionContext context, Destination destination)
+                throws Exception {
+        }
+
+        public void destroy() {
+        }
+
+        public void gc() {
+        }
+
+        public ConsumerInfo getConsumerInfo() {
+            return info;
+        }
+
+        public long getDequeueCounter() {
+            return 0;
+        }
+
+        public long getDispatchedCounter() {
+            return 0;
+        }
+
+        public int getDispatchedQueueSize() {
+            return 0;
+        }
+
+        public long getEnqueueCounter() {
+            return 0;
+        }
+
+        public int getInFlightSize() {
+            return 0;
+        }
+
+        public int getInFlightUsage() {
+            return 0;
+        }
+
+        public ObjectName getObjectName() {
+            return null;
+        }
+
+        public int getPendingQueueSize() {
+            return 0;
+        }
+
+        public int getPrefetchSize() {
+            return 0;
+        }
+
+        public String getSelector() {
+            return null;
+        }
+
+        public boolean isBrowser() {
+            return false;
+        }
+
+        public boolean isFull() {
+            return false;
+        }
+
+        public boolean isHighWaterMark() {
+            return false;
+        }
+
+        public boolean isLowWaterMark() {
+            return false;
+        }
+
+        public boolean isRecoveryRequired() {
+            return false;
+        }
+
+        public boolean isSlave() {
+            return false;
+        }
+
+        public boolean matches(MessageReference node,
+                MessageEvaluationContext context) throws IOException {
+            return true;
+        }
+
+        public boolean matches(ActiveMQDestination destination) {
+            return false;
+        }
+
+        public void processMessageDispatchNotification(
+                MessageDispatchNotification mdn) throws Exception {
+        }
+
+        public Response pullMessage(ConnectionContext context, MessagePull pull)
+                throws Exception {
+            return null;
+        }
+
+        public List<MessageReference> remove(ConnectionContext context,
+                Destination destination) throws Exception {
+            return new ArrayList<MessageReference>(dispatched);
+        }
+
+        public void setObjectName(ObjectName objectName) {
+        }
+
+        public void setSelector(String selector)
+                throws InvalidSelectorException, UnsupportedOperationException {
+        }
+
+        public void updateConsumerPrefetch(int newPrefetch) {
+        }
+
+        public boolean addRecoveredMessage(ConnectionContext context,
+                MessageReference message) throws Exception {
+            return false;
+        }
+
+        public ActiveMQDestination getActiveMQDestination() {
+            return null;
+        }
+
+        public int getLockPriority() {
+            return 0;
+        }
+
+        public boolean isLockExclusive() {
+            return false;
+        }
+
+        public void addDestination(Destination destination) {
+        }
+
+        public void removeDestination(Destination destination) {
+        }
+
+        public int countBeforeFull() {
+            return 10;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc47ab6d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
index 06f60e2..18cf12e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
@@ -13,6 +13,11 @@ import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.Wait;
 import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +60,7 @@ import java.util.concurrent.*;
  * the message after shutdown.
  */
 
+@RunWith(value = Parameterized.class)
 public class AMQ4952Test extends TestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class);
@@ -72,7 +78,15 @@ public class AMQ4952Test extends TestCase {
 
     private EmbeddedDataSource localDataSource;
 
+    @Parameterized.Parameter(0)
+    public boolean enableCursorAudit;
 
+    @Parameterized.Parameters(name="enableAudit={0}")
+    public static Iterable<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][]{{Boolean.TRUE},{Boolean.FALSE}});
+    }
+
+    @Test
     public void testConsumerBrokerRestart() throws Exception {
 
         Callable consumeMessageTask = new Callable() {
@@ -209,13 +223,15 @@ public class AMQ4952Test extends TestCase {
     }
 
     @Override
-    protected void setUp() throws Exception {
+    @Before
+    public void setUp() throws Exception {
         super.setUp();
-        doSetUp(true);
+        doSetUp();
     }
 
     @Override
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         doTearDown();
         super.tearDown();
     }
@@ -223,7 +239,7 @@ public class AMQ4952Test extends TestCase {
     protected void doTearDown() throws Exception {
 
         try {
-            consumerBroker.stop();
+            producerBroker.stop();
         } catch (Exception ex) {
         }
         try {
@@ -232,7 +248,7 @@ public class AMQ4952Test extends TestCase {
         }
     }
 
-    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+    protected void doSetUp() throws Exception {
         producerBroker = createProducerBroker();
         consumerBroker = createConsumerBroker(true);
     }
@@ -347,7 +363,7 @@ public class AMQ4952Test extends TestCase {
         PolicyEntry policy = new PolicyEntry();
 
         policy.setQueue(">");
-        policy.setUseCache(false);
+        policy.setEnableAudit(enableCursorAudit);
         policy.setExpireMessagesPeriod(0);
 
         // set replay with no consumers