You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/29 17:33:04 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6286

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 6271a7e35 -> 6fda5e326


https://issues.apache.org/jira/browse/AMQ-6286

Create a follow on test showing AMQP respects order.
(cherry picked from commit e02c1a17f83b21cbf42fa9e6e78a891af3edd8bb)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8ae7c8f3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8ae7c8f3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8ae7c8f3

Branch: refs/heads/activemq-5.13.x
Commit: 8ae7c8f3a7b82e8f3ab4ef4331fbda5ca0fd0d17
Parents: 6271a7e
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 24 09:39:20 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 29 13:27:17 2016 -0400

----------------------------------------------------------------------
 .../amqp/JmsTransactedMessageOrderTest.java     | 139 +++++++++++++++++++
 1 file changed, 139 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8ae7c8f3/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
new file mode 100644
index 0000000..c286497
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JmsTransactedMessageOrderTest.class);
+
+    private final int prefetch;
+
+    public JmsTransactedMessageOrderTest(int prefetch) {
+        this.prefetch = prefetch;
+    }
+
+    @Parameters(name="Prefetch->{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { {0}, {1}, {100} });
+    }
+
+    @Override
+    protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception {
+        final PolicyMap policyMap = new PolicyMap();
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        final PolicyEntry policyEntry = new PolicyEntry();
+
+        policyEntry.setQueue(">");
+        policyEntry.setStrictOrderDispatch(true);
+        policyEntry.setProducerFlowControl(true);
+        policyEntry.setMemoryLimit(1024 * 1024);
+
+        policyEntries.add(policyEntry);
+
+        policyMap.setPolicyEntries(policyEntries);
+        policyMap.setDefaultEntry(policyEntry);
+
+        brokerService.setDestinationPolicy(policyMap);
+    }
+
+    @Test
+    public void testMessageOrderAfterRollback() throws Exception {
+        sendMessages(5);
+
+        int counter = 0;
+        while (counter++ < 10) {
+            LOG.info("Creating connection using prefetch of: {}", prefetch);
+
+            JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch));
+
+            connection = cf.createConnection();
+            connection.start();
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(getDestinationName());
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            Message message = consumer.receive(5000);
+            assertNotNull(message);
+            assertTrue(message instanceof TextMessage);
+
+            int sequenceID = message.getIntProperty("sequenceID");
+            assertEquals(0, sequenceID);
+
+            LOG.info("Read message = {}", ((TextMessage) message).getText());
+            session.rollback();
+            session.close();
+            connection.close();
+        }
+    }
+
+    public void sendMessages(int messageCount) throws JMSException {
+        Connection connection = null;
+        try {
+            connection = createConnection();
+            connection.start();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getDestinationName());
+
+            for (int i = 0; i < messageCount; ++i) {
+                MessageProducer messageProducer = session.createProducer(queue);
+                TextMessage message = session.createTextMessage("(" + i + ")");
+                message.setIntProperty("sequenceID", i);
+                messageProducer.send(message);
+                LOG.info("Sent message = {}", message.getText());
+            }
+
+        } catch (Exception exp) {
+            exp.printStackTrace(System.out);
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+}


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6340

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6340

combine the lists in the correct order for later redispatch.
(cherry picked from commit 4e23adfcc981d3e13e7f1b1182b89a954160a26a)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6fda5e32
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6fda5e32
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6fda5e32

Branch: refs/heads/activemq-5.13.x
Commit: 6fda5e3262b311de655d6f0e0d1c4035317ab4c1
Parents: 8ae7c8f
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 29 12:57:30 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 29 13:27:37 2016 -0400

----------------------------------------------------------------------
 .../amqp/JmsTransactedMessageOrderTest.java       |  6 ++----
 .../broker/region/PrefetchSubscription.java       | 18 ++++++++++--------
 2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6fda5e32/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
index c286497..2134759 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java
@@ -69,8 +69,6 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
 
         policyEntry.setQueue(">");
         policyEntry.setStrictOrderDispatch(true);
-        policyEntry.setProducerFlowControl(true);
-        policyEntry.setMemoryLimit(1024 * 1024);
 
         policyEntries.add(policyEntry);
 
@@ -85,7 +83,7 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
         sendMessages(5);
 
         int counter = 0;
-        while (counter++ < 10) {
+        while (counter++ < 20) {
             LOG.info("Creating connection using prefetch of: {}", prefetch);
 
             JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch));
@@ -100,11 +98,11 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport {
             Message message = consumer.receive(5000);
             assertNotNull(message);
             assertTrue(message instanceof TextMessage);
+            LOG.info("Read message = {}", ((TextMessage) message).getText());
 
             int sequenceID = message.getIntProperty("sequenceID");
             assertEquals(0, sequenceID);
 
-            LOG.info("Read message = {}", ((TextMessage) message).getText());
             session.rollback();
             session.close();
             connection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/6fda5e32/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 8332d25..b8184b1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -639,31 +640,32 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
     }
 
     public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
-        List<MessageReference> rc = new ArrayList<MessageReference>();
+        LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>();
         synchronized(pendingLock) {
             super.remove(context, destination);
             // Here is a potential problem concerning Inflight stat:
             // Messages not already committed or rolled back may not be removed from dispatched list at the moment
             // Except if each commit or rollback callback action comes before remove of subscriber.
-            rc.addAll(pending.remove(context, destination));
+            redispatch.addAll(pending.remove(context, destination));
 
             if (dispatched == null) {
-                return rc;
+                return redispatch;
             }
 
             // Synchronized to DispatchLock if necessary
             if (dispatched == this.dispatched) {
                 synchronized(dispatchLock) {
-                    updateDestinationStats(rc, destination, dispatched);
+                    addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
                 }
             } else {
-                updateDestinationStats(rc, destination, dispatched);
+                addReferencesAndUpdateRedispatch(redispatch, destination, dispatched);
             }
         }
-        return rc;
+
+        return redispatch;
     }
 
-    private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
+    private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) {
         ArrayList<MessageReference> references = new ArrayList<MessageReference>();
         for (MessageReference r : dispatched) {
             if (r.getRegionDestination() == destination) {
@@ -671,7 +673,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                 getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
             }
         }
-        rc.addAll(references);
+        redispatch.addAll(0, references);
         destination.getDestinationStatistics().getInflight().subtract(references.size());
         dispatched.removeAll(references);
     }