You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/06 22:23:07 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5340

Repository: activemq
Updated Branches:
  refs/heads/master a2697b844 -> 3ba28f622


https://issues.apache.org/jira/browse/AMQ-5340

A QueueBrowser no longer checks expiry on messages to prevent a
browser from hanging in between the hasMoreElements check and actually
getting a message.  This means that if messages were in the queue when
the browser started it will receive messages even if they are now
expired. Even though the browser will get the expired message, the
broker will still expire it to prevent future access to it.

Thanks to Henno Vermeulen for providing a test case.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/174dcbff
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/174dcbff
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/174dcbff

Branch: refs/heads/master
Commit: 174dcbff2a8af5e550793af6d413d2967409bc33
Parents: a2697b8
Author: Christopher L. Shannon <ch...@gmail.com>
Authored: Sun Jul 5 22:11:23 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 6 16:13:52 2015 -0400

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |   5 +-
 .../activemq/ActiveMQMessageConsumer.java       |   3 +-
 .../activemq/JmsQueueBrowserExpirationTest.java | 153 +++++++++++++++++++
 3 files changed, 159 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/174dcbff/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 53a00c9..7d91ba6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -658,7 +658,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                                     if (broker.isExpired(node)) {
                                         ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
                                     }
-                                    continue;
+                                    //AMQ-5340
+                                    if (!isBrowser()) {
+                                        continue;
+                                    }
                                 }
                                 dispatch(node);
                                 count++;

http://git-wip-us.apache.org/repos/asf/activemq/blob/174dcbff/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 0808ead..d53f7b6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -492,7 +492,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                     }
                 } else if (md.getMessage() == null) {
                     return null;
-                } else if (isConsumerExpiryCheckEnabled() && md.getMessage().isExpired()) {
+                //AMQ-5340 - only check for expired if not a browser
+                } else if (!isBrowser() && isConsumerExpiryCheckEnabled() && md.getMessage().isExpired()) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(getConsumerId() + " received expired message: " + md);
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/174dcbff/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
new file mode 100644
index 0000000..801c80e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test shows that when all messages are expired the QueueBrowser will
+ * still finish properly and not hang indefinitely.  If a queue browser subscription
+ * detects an expired message, it will tell the broker to expire the message but still
+ * dispatch the message to the client as we want to get a snapshot in time.  This prevents
+ * the problem of the browser enumeration returning true for hasMoreElements and then
+ * hanging forever because all messages expired on dispatch.
+ *
+ * See: https://issues.apache.org/jira/browse/AMQ-5340
+ *
+ * <p>
+ * This test is based on a test case submitted by Henno Vermeulen for AMQ-5340
+ */
+public class JmsQueueBrowserExpirationTest {
+
+    private static final int MESSAGES_TO_SEND = 50;
+    // Message expires after 1 second
+    private static final long TTL = 1000;
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(JmsQueueBrowserExpirationTest.class);
+
+    private BrokerService broker;
+    private URI connectUri;
+    private ActiveMQConnectionFactory factory;
+    private final ActiveMQQueue queue = new ActiveMQQueue("TEST");
+
+    @Before
+    public void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+
+        TransportConnector connector = broker.addConnector("vm://localhost");
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+        connectUri = connector.getConnectUri();
+        factory = new ActiveMQConnectionFactory(connectUri);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    //This should finish in under 3 seconds because the messages should be expired
+    @Test(timeout=3000)
+    public void testBrowsingExpiration() throws JMSException, InterruptedException {
+
+        sendTestMessages();
+
+        // Browse the queue.
+        Connection browserConnection = factory.createConnection();
+        browserConnection.start();
+
+        int browsed = browse(queue, browserConnection);
+
+        // The number of messages browsed should be equal to the number of
+        // messages sent.
+        assertEquals(MESSAGES_TO_SEND, browsed);
+
+        long begin = System.nanoTime();
+        while (browsed != 0) {
+            // Give JMS threads more opportunity to do their work.
+            Thread.sleep(100);
+            browsed = browse(queue, browserConnection);
+            String time =
+                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin)
+                            + " ms";
+            System.out.println("[" + time + "] found " + browsed + " messages");
+        }
+        System.out.println("Finished");
+        browserConnection.close();
+    }
+
+    private int browse(ActiveMQQueue queue, Connection connection)
+            throws JMSException {
+        Session session =
+                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueBrowser browser = session.createBrowser(queue);
+        Enumeration<?> enumeration = browser.getEnumeration();
+        int browsed = 0;
+        while (enumeration.hasMoreElements()) {
+            TextMessage m = (TextMessage) enumeration.nextElement();
+            browsed++;
+            LOG.debug("B[{}]: {}", browsed, m.getText());
+        }
+        browser.close();
+        session.close();
+        return browsed;
+    }
+
+    protected void sendTestMessages() throws JMSException {
+        // Send the messages to the Queue.
+        Connection prodConnection = factory.createConnection();
+        prodConnection.start();
+        Session prodSession = prodConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = prodSession.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        producer.setTimeToLive(TTL);
+
+        for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
+            String msgStr = "Message: " + i;
+            producer.send(prodSession.createTextMessage(msgStr));
+            LOG.info("P&C: {}", msgStr);
+        }
+        prodSession.close();
+    }
+
+
+}
\ No newline at end of file


[2/2] activemq git commit: This closes #127

Posted by ta...@apache.org.
This closes #127


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3ba28f62
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3ba28f62
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3ba28f62

Branch: refs/heads/master
Commit: 3ba28f6221d1a698a6faf34db2c94bfba9b8277b
Parents: a2697b8 174dcbf
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 6 16:18:36 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 6 16:18:36 2015 -0400

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |   5 +-
 .../activemq/ActiveMQMessageConsumer.java       |   3 +-
 .../activemq/JmsQueueBrowserExpirationTest.java | 153 +++++++++++++++++++
 3 files changed, 159 insertions(+), 2 deletions(-)
----------------------------------------------------------------------