You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/05/14 16:21:44 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6293

Repository: activemq
Updated Branches:
  refs/heads/master ab434ee77 -> 1241e4120


https://issues.apache.org/jira/browse/AMQ-6293

Fixing Queue destination statistics in dropMessage by adding sync in
between the check for dropped and actually dropping the message plus
fixing dequeue stats so messages aren't counted twice


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1241e412
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1241e412
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1241e412

Branch: refs/heads/master
Commit: 1241e4120a5f4af58be2952a6ec985dcd8ac11c3
Parents: ab434ee
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Sat May 14 15:30:28 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Sat May 14 16:20:41 2016 +0000

----------------------------------------------------------------------
 .../broker/region/IndirectMessageReference.java |  47 ++++-
 .../broker/region/NullMessageReference.java     |  29 +++
 .../apache/activemq/broker/region/Queue.java    |   9 +-
 .../broker/region/QueueMessageReference.java    |  19 +-
 .../org/apache/activemq/bugs/AMQ6293Test.java   | 178 +++++++++++++++++++
 5 files changed, 266 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1241e412/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
index 3e7cab7..c1b5f3c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
@@ -24,8 +24,8 @@ import org.apache.activemq.command.MessageId;
  * Keeps track of a message that is flowing through the Broker. This object may
  * hold a hard reference to the message or only hold the id of the message if
  * the message has been persisted on in a MessageStore.
- * 
- * 
+ *
+ *
  */
 public class IndirectMessageReference implements QueueMessageReference {
 
@@ -38,7 +38,7 @@ public class IndirectMessageReference implements QueueMessageReference {
     /** Direct reference to the message */
     private final Message message;
     private final MessageId messageId;
-    
+
     /**
      * @param message
      */
@@ -50,44 +50,70 @@ public class IndirectMessageReference implements QueueMessageReference {
         message.getGroupSequence();
     }
 
+    @Override
     public Message getMessageHardRef() {
         return message;
     }
 
+    @Override
     public int getReferenceCount() {
         return message.getReferenceCount();
     }
 
+    @Override
     public int incrementReferenceCount() {
         return message.incrementReferenceCount();
     }
 
+    @Override
     public int decrementReferenceCount() {
         return message.decrementReferenceCount();
     }
 
+    @Override
     public Message getMessage() {
         return message;
     }
 
+    @Override
     public String toString() {
         return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
     }
 
+    @Override
     public void incrementRedeliveryCounter() {
         message.incrementRedeliveryCounter();
     }
 
+    @Override
     public synchronized boolean isDropped() {
         return dropped;
     }
 
+    @Override
     public synchronized void drop() {
         dropped = true;
         lockOwner = null;
         message.decrementReferenceCount();
     }
 
+    /**
+     * Check if the message has already been dropped before
+     * dropping. Return true if dropped, else false.
+     * This method exists so that this can be done atomically
+     * under the intrinisic lock
+     */
+    @Override
+    public synchronized boolean dropIfLive() {
+        if (isDropped()) {
+            return false;
+        } else {
+            drop();
+            return true;
+        }
+    }
+
+    @Override
     public boolean lock(LockOwner subscription) {
         synchronized (this) {
             if (dropped || lockOwner != null) {
@@ -98,28 +124,34 @@ public class IndirectMessageReference implements QueueMessageReference {
         }
     }
 
+    @Override
     public synchronized boolean unlock() {
         boolean result = lockOwner != null;
         lockOwner = null;
         return result;
     }
 
+    @Override
     public synchronized LockOwner getLockOwner() {
         return lockOwner;
     }
 
+    @Override
     public int getRedeliveryCounter() {
         return message.getRedeliveryCounter();
     }
 
+    @Override
     public MessageId getMessageId() {
         return messageId;
     }
 
+    @Override
     public Message.MessageDestination getRegionDestination() {
         return message.getRegionDestination();
     }
 
+    @Override
     public boolean isPersistent() {
         return message.isPersistent();
     }
@@ -128,38 +160,47 @@ public class IndirectMessageReference implements QueueMessageReference {
         return lockOwner != null;
     }
 
+    @Override
     public synchronized boolean isAcked() {
         return acked;
     }
 
+    @Override
     public synchronized void setAcked(boolean b) {
         acked = b;
     }
 
+    @Override
     public String getGroupID() {
         return message.getGroupID();
     }
 
+    @Override
     public int getGroupSequence() {
         return message.getGroupSequence();
     }
 
+    @Override
     public ConsumerId getTargetConsumerId() {
         return message.getTargetConsumerId();
     }
 
+    @Override
     public long getExpiration() {
         return message.getExpiration();
     }
 
+    @Override
     public boolean isExpired() {
         return message.isExpired();
     }
 
+    @Override
     public synchronized int getSize() {
        return message.getSize();
     }
 
+    @Override
     public boolean isAdvisory() {
        return message.isAdvisory();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/1241e412/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
index 9510972..bef9b23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
@@ -29,98 +29,127 @@ public final class NullMessageReference implements QueueMessageReference {
     private final ActiveMQMessage message = new ActiveMQMessage();
     private volatile int references;
 
+    @Override
     public void drop() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
+    public synchronized boolean dropIfLive() {
+        throw new RuntimeException("not implemented");
+    }
+
+    @Override
     public LockOwner getLockOwner() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public boolean isAcked() {
         return false;
     }
 
+    @Override
     public boolean isDropped() {
         return false;
     }
 
+    @Override
     public boolean lock(LockOwner subscription) {
         return true;
     }
 
+    @Override
     public void setAcked(boolean b) {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public boolean unlock() {
         return true;
     }
 
+    @Override
     public int decrementReferenceCount() {
         return --references;
     }
 
+    @Override
     public long getExpiration() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public String getGroupID() {
         return null;
     }
 
+    @Override
     public int getGroupSequence() {
         return 0;
     }
 
+    @Override
     public Message getMessage()  {
         return message;
     }
 
+    @Override
     public Message getMessageHardRef() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public MessageId getMessageId() {
         return message.getMessageId();
     }
 
+    @Override
     public int getRedeliveryCounter() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public int getReferenceCount() {
         return references;
     }
 
+    @Override
     public Destination getRegionDestination() {
         return null;
     }
 
+    @Override
     public int getSize() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public ConsumerId getTargetConsumerId() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public void incrementRedeliveryCounter() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public int incrementReferenceCount() {
         return ++references;
     }
 
+    @Override
     public boolean isExpired() {
         return false;
     }
 
+    @Override
     public boolean isPersistent() {
         throw new RuntimeException("not implemented");
     }
 
+    @Override
     public boolean isAdvisory() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/1241e412/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 0d06022..f025998 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1748,7 +1748,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         // This sends the ack the the journal..
         if (!ack.isInTransaction()) {
             acknowledge(context, sub, ack, reference);
-            getDestinationStatistics().getDequeues().increment();
             dropMessage(reference);
         } else {
             try {
@@ -1758,7 +1757,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
 
                     @Override
                     public void afterCommit() throws Exception {
-                        getDestinationStatistics().getDequeues().increment();
                         dropMessage(reference);
                         wakeup();
                     }
@@ -1788,9 +1786,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
     }
 
     private void dropMessage(QueueMessageReference reference) {
-        if (!reference.isDropped()) {
-            reference.drop();
-            destinationStatistics.getMessages().decrement();
+        //use dropIfLive so we only process the statistics at most one time
+        if (reference.dropIfLive()) {
+            getDestinationStatistics().getDequeues().increment();
+            getDestinationStatistics().getMessages().decrement();
             pagedInMessagesLock.writeLock().lock();
             try {
                 pagedInMessages.remove(reference);

http://git-wip-us.apache.org/repos/asf/activemq/blob/1241e412/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
index 21d43e4..89bcd6a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
@@ -18,25 +18,28 @@ package org.apache.activemq.broker.region;
 
 /**
  * Queue specific MessageReference.
- * 
+ *
  * @author fateev@amazon.com
- * 
+ *
  */
 public interface QueueMessageReference extends MessageReference {
 
     QueueMessageReference NULL_MESSAGE = new NullMessageReference();
 
     boolean isAcked();
-    
+
     void setAcked(boolean b);
-    
+
     void drop();
- 
+
+    boolean dropIfLive();
+
+    @Override
     boolean isDropped();
-        
+
     boolean lock(LockOwner subscription);
-    
+
     boolean unlock();
-    
+
     LockOwner getLockOwner();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/1241e412/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6293Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6293Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6293Test.java
new file mode 100644
index 0000000..07a87aa
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6293Test.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AMQ6293Test {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ6293Test.class);
+
+    private BrokerService brokerService;
+    private String connectionUri;
+    private ExecutorService service = Executors.newFixedThreadPool(6);
+    private final ActiveMQQueue queue = new ActiveMQQueue("test");
+    private final int numMessages = 10000;
+    private Connection connection;
+    private Session session;
+    private final AtomicBoolean isException = new AtomicBoolean();
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    @Before
+    public void before() throws Exception {
+        brokerService = new BrokerService();
+        TransportConnector connector = brokerService.addConnector("tcp://localhost:0");
+        connectionUri = connector.getPublishableConnectString();
+        brokerService.setPersistent(true);
+        brokerService.setDataDirectoryFile(dataFileDir.getRoot());
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        policyMap.setDefaultEntry(entry);
+        brokerService.setDestinationPolicy(policyMap);
+        entry.setQueuePrefetch(100);
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        Connection connection = null;
+        connection = factory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @After
+    public void after() throws Exception {
+        if (connection != null) {
+            connection.stop();
+        }
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout=90000)
+    public void testDestinationStatisticsOnPurge() throws Exception {
+        //send messages to the store
+        sendTestMessages(numMessages);
+
+        //Start up 5 consumers
+        final Queue regionQueue = (Queue) brokerService.getRegionBroker().getDestinationMap().get(queue);
+        for (int i = 0; i < 5; i++) {
+            service.submit(new TestConsumer(session.createConsumer(queue)));
+        }
+
+        //Start a purge task at the same time as the consumers
+        for (int i = 0; i < 1; i++) {
+            service.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        regionQueue.purge();
+                    } catch (Exception e) {
+                        isException.set(true);
+                        LOG.warn(e.getMessage(), e);
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        }
+
+        service.shutdown();
+        assertTrue("Took too long to shutdown service", service.awaitTermination(1, TimeUnit.MINUTES));
+        assertFalse("Exception encountered", isException.get());
+
+        //Verify dequeue and message counts
+        assertEquals(0, regionQueue.getDestinationStatistics().getMessages().getCount());
+        assertEquals(numMessages, regionQueue.getDestinationStatistics().getDequeues().getCount());
+    }
+
+    private void sendTestMessages(int numMessages) throws JMSException {
+        MessageProducer producer = session.createProducer(queue);
+
+        final TextMessage textMessage = session.createTextMessage();
+        textMessage.setText("Message");
+        for (int i = 1; i <= numMessages; i++) {
+            producer.send(textMessage);
+            if (i % 1000 == 0) {
+                LOG.info("Sent {} messages", i);
+            }
+        }
+    }
+
+    private class TestConsumer implements Runnable {
+        private final MessageConsumer consumer;
+
+        public TestConsumer(final MessageConsumer consumer) throws JMSException {
+            this.consumer = consumer;
+        }
+
+        @Override
+        public void run() {
+            try {
+                int i = 0;
+                while (consumer.receive(1000) != null) {
+                    i++;
+                    if (i % 1000 == 0) {
+                        LOG.info("Received {} messages", i);
+                    }
+                }
+            } catch (Exception e) {
+                isException.set(true);
+                LOG.warn(e.getMessage(), e);
+                throw new RuntimeException(e);
+            }
+        }
+    };
+}
\ No newline at end of file