You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/31 17:41:25 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5347 - variant of patch and additional tests applied, prefetch=0 test without restart exposed ordering bug with fix for https://issues.apache.org/jira/browse/AMQ-2719

Repository: activemq
Updated Branches:
  refs/heads/trunk 0b148ade3 -> 52e1a05aa


https://issues.apache.org/jira/browse/AMQ-5347 - variant of patch and additional tests applied, prefetch=0 test without restart exposed ordering bug with fix for https://issues.apache.org/jira/browse/AMQ-2719


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52e1a05a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52e1a05a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52e1a05a

Branch: refs/heads/trunk
Commit: 52e1a05aaadda40eb2ce63f610937981f96506c6
Parents: 0b148ad
Author: gtully <ga...@gmail.com>
Authored: Fri Oct 31 16:33:56 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Fri Oct 31 16:40:41 2014 +0000

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |   6 +-
 .../broker/region/PrefetchSubscription.java     |   3 +
 .../apache/activemq/broker/region/Queue.java    |  33 +-
 .../activemq/broker/region/RegionBroker.java    |   4 +-
 .../RedeliveryRestartWithExceptionTest.java     | 419 +++++++++++++++++++
 5 files changed, 447 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/52e1a05a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 5bb7ec1..a9e4c86 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -929,7 +929,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
         try {
             if (!stopping.get()) {
                 if (messageDispatch != null) {
-                    broker.preProcessDispatch(messageDispatch);
+                    try {
+                        broker.preProcessDispatch(messageDispatch);
+                    } catch (RuntimeException convertToIO) {
+                        throw new IOException(convertToIO);
+                    }
                 }
                 dispatch(command);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/52e1a05a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 5ba3b53..8b8a788 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -731,6 +731,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                             LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
                         }
                     }
+                    if (node instanceof QueueMessageReference) {
+                        ((QueueMessageReference) node).unlock();
+                    }
                 }
             });
             context.getConnection().dispatchAsync(md);

http://git-wip-us.apache.org/repos/asf/activemq/blob/52e1a05a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index a1b8b07..5c7a988 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1968,24 +1968,25 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                 // proper order
                 redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
             }
-            if (!pagedInPendingDispatch.isEmpty()) {
-                // Next dispatch anything that had not been
-                // dispatched before.
-                pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
-            }
-            // and now see if we can dispatch the new stuff.. and append to
-            // the pending
-            // list anything that does not actually get dispatched.
-            if (list != null && !list.isEmpty()) {
-                if (pagedInPendingDispatch.isEmpty()) {
-                    pagedInPendingDispatch.addAll(doActualDispatch(list));
-                } else {
-                    for (MessageReference qmr : list) {
-                        if (!pagedInPendingDispatch.contains(qmr)) {
-                            pagedInPendingDispatch.addMessageLast(qmr);
+            if (redeliveredWaitingDispatch.isEmpty()) {
+                if (!pagedInPendingDispatch.isEmpty()) {
+                    // Next dispatch anything that had not been
+                    // dispatched before.
+                    pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
+                }
+                // and now see if we can dispatch the new stuff.. and append to the pending
+                // list anything that does not actually get dispatched.
+                if (list != null && !list.isEmpty()) {
+                    if (pagedInPendingDispatch.isEmpty()) {
+                        pagedInPendingDispatch.addAll(doActualDispatch(list));
+                    } else {
+                        for (MessageReference qmr : list) {
+                            if (!pagedInPendingDispatch.contains(qmr)) {
+                                pagedInPendingDispatch.addMessageLast(qmr);
+                            }
                         }
+                        doWakeUp = true;
                     }
-                    doWakeUp = true;
                 }
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/activemq/blob/52e1a05a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index cb79c84..658bc7c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -628,7 +628,9 @@ public class RegionBroker extends EmptyBroker {
                 try {
                     ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
                 } catch (IOException error) {
-                    LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
+                    RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error);
+                    LOG.warn(runtimeException.getLocalizedMessage(), runtimeException);
+                    throw runtimeException;
                 } finally {
                     message.setRedeliveryCounter(originalValue);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/52e1a05a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
new file mode 100644
index 0000000..4126f06
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedeliveryRestartWithExceptionTest extends TestSupport {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartWithExceptionTest.class);
+    ActiveMQConnection connection;
+    BrokerService broker = null;
+    String queueName = "redeliveryRestartQ";
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        configureBroker(broker, true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        broker.stop();
+        super.tearDown();
+    }
+
+    protected void configureBroker(BrokerService broker, boolean throwExceptionOnUpdate) throws Exception {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPersistJMSRedelivered(true);
+        policyMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(policyMap);
+        broker.setPersistenceAdapter(new KahaDBWithUpdateExceptionPersistenceAdapter(throwExceptionOnUpdate));
+        broker.addConnector("tcp://0.0.0.0:0");
+    }
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection);
+        TextMessage msg = null;
+        MessageConsumer consumer = session.createConsumer(destination);
+        Exception expectedException = null;
+        try {
+            for (int i = 0; i < 5; i++) {
+                msg = (TextMessage) consumer.receive(5000);
+                LOG.info("not redelivered? got: " + msg);
+                assertNotNull("got the message", msg);
+                assertTrue("Should not receive the 5th message", i < 4);
+                //The first 4 messages will be ok but the 5th one should hit an exception in updateMessage and should not be delivered
+            }
+        } catch (Exception e) {
+            //Expecting an exception and disconnect on the 5th message
+            LOG.info("Got expected:", e);
+            expectedException = e;
+        }
+        assertNotNull("Expecting an exception when updateMessage fails", expectedException);                
+        
+        consumer.close();
+        connection.close();
+        
+        restartBroker();
+        
+        connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(queueName);
+        consumer = session.createConsumer(destination);
+        
+        
+        // consume the messages that were previously delivered
+        for (int i = 0; i < 4; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+            assertTrue("redelivery count survives restart", msg.getLongProperty("JMSXDeliveryCount") > 1);
+            msg.acknowledge();
+        }
+        
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 6; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterTransientFailureConnectionDrop() throws Exception {
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection);
+        TextMessage msg = null;
+        MessageConsumer consumer = session.createConsumer(destination);
+        Exception expectedException = null;
+        try {
+            for (int i = 0; i < 5; i++) {
+                msg = (TextMessage) consumer.receive(5000);
+                LOG.info("not redelivered? got: " + msg);
+                assertNotNull("got the message", msg);
+                assertTrue("Should not receive the 5th message", i < 4);
+                //The first 4 messages will be ok but the 5th one should hit an exception in updateMessage and should not be delivered
+            }
+        } catch (Exception e) {
+            //Expecting an exception and disconnect on the 5th message
+            LOG.info("Got expected:", e);
+            expectedException = e;
+        }
+        assertNotNull("Expecting an exception when updateMessage fails", expectedException);
+
+        consumer.close();
+        connection.close();
+
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(queueName);
+        consumer = session.createConsumer(destination);
+
+
+        // consume the messages that were previously delivered
+        for (int i = 0; i < 4; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+            assertTrue("redelivery count survives reconnect", msg.getLongProperty("JMSXDeliveryCount") > 1);
+            msg.acknowledge();
+        }
+
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 6; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+    private void restartBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+    private BrokerService createRestartedBroker() throws Exception {
+        broker = new BrokerService();
+        configureBroker(broker, false);
+        return broker;
+    }
+
+    private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+    
+    private class KahaDBWithUpdateExceptionPersistenceAdapter implements PersistenceAdapter {
+        
+        private KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
+        private boolean throwExceptionOnUpdate;
+        
+        public KahaDBWithUpdateExceptionPersistenceAdapter(boolean throwExceptionOnUpdate) {
+            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
+        }
+        
+        @Override
+        public void start() throws Exception {
+            kahaDB.start();
+        }
+
+        @Override
+        public void stop() throws Exception {
+            kahaDB.stop();
+        }
+
+        @Override
+        public Set<ActiveMQDestination> getDestinations() {
+            return kahaDB.getDestinations();
+        }
+
+        @Override
+        public MessageStore createQueueMessageStore(ActiveMQQueue destination) 
+                throws IOException {
+            MessageStore proxyMessageStoreWithException = new ProxyMessageStoreWithUpdateException(
+                    kahaDB.createQueueMessageStore(destination), throwExceptionOnUpdate);
+            return proxyMessageStoreWithException;
+        }
+
+        @Override
+        public TopicMessageStore createTopicMessageStore(
+                ActiveMQTopic destination) throws IOException {
+            TopicMessageStore proxyMessageStoreWithException = new ProxyTopicMessageStoreWithUpdateException(
+                    kahaDB.createTopicMessageStore(destination), throwExceptionOnUpdate);
+            return proxyMessageStoreWithException;
+        }
+        
+        @Override
+        public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+            return kahaDB.createJobSchedulerStore();
+        }
+
+        @Override
+        public void removeQueueMessageStore(ActiveMQQueue destination) {
+            kahaDB.removeQueueMessageStore(destination);
+        }
+
+        @Override
+        public void removeTopicMessageStore(ActiveMQTopic destination) {
+            kahaDB.removeTopicMessageStore(destination);
+        }
+
+        @Override
+        public TransactionStore createTransactionStore() throws IOException {
+            return kahaDB.createTransactionStore();
+        }
+
+        @Override
+        public void beginTransaction(ConnectionContext context)
+                throws IOException {
+            kahaDB.beginTransaction(context);
+        }
+
+        @Override
+        public void commitTransaction(ConnectionContext context)
+                throws IOException {
+            kahaDB.commitTransaction(context);
+        }
+
+        @Override
+        public void rollbackTransaction(ConnectionContext context)
+                throws IOException {
+            kahaDB.rollbackTransaction(context);
+        }
+
+        @Override
+        public long getLastMessageBrokerSequenceId() throws IOException {
+            return kahaDB.getLastMessageBrokerSequenceId();
+        }
+
+        @Override
+        public void deleteAllMessages() throws IOException {
+            kahaDB.deleteAllMessages();
+        }
+
+        @Override
+        public void setUsageManager(SystemUsage usageManager) {
+            kahaDB.setUsageManager(usageManager);
+        }
+
+        @Override
+        public void setBrokerName(String brokerName) {
+            kahaDB.setBrokerName(brokerName);
+        }
+
+        @Override
+        public void setDirectory(File dir) {
+            kahaDB.setDirectory(dir);
+        }
+
+        @Override
+        public File getDirectory() {
+            return kahaDB.getDirectory();
+        }
+
+        @Override
+        public void checkpoint(boolean sync) throws IOException {
+            kahaDB.checkpoint(sync);
+        }
+
+        @Override
+        public long size() {
+            return kahaDB.size();
+        }
+
+        @Override
+        public long getLastProducerSequenceId(ProducerId id) throws IOException {
+            return kahaDB.getLastProducerSequenceId(id);
+        }
+        
+    }
+    
+    private class ProxyMessageStoreWithUpdateException extends ProxyMessageStore {
+        private boolean throwExceptionOnUpdate;
+        private int numBeforeException = 4;
+        public ProxyMessageStoreWithUpdateException(MessageStore delegate, boolean throwExceptionOnUpdate) {
+            super(delegate);
+            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
+        }
+        
+        @Override
+        public void updateMessage(Message message) throws IOException {
+            if(throwExceptionOnUpdate) {
+                if(numBeforeException > 0) {
+                    numBeforeException--;
+                    super.updateMessage(message);
+                } else {
+                    // lets only do it once so we can validate transient store failure
+                    throwExceptionOnUpdate = false;
+
+                    //A message that has never been delivered will hit this exception
+                    throw new IOException("Hit our simulated exception writing the update to disk");
+                }
+            } else {
+                super.updateMessage(message);
+            }
+        }
+    }
+    
+    private class ProxyTopicMessageStoreWithUpdateException extends ProxyTopicMessageStore {
+        private boolean throwExceptionOnUpdate;
+        private int numBeforeException = 4;
+        public ProxyTopicMessageStoreWithUpdateException(TopicMessageStore delegate, boolean throwExceptionOnUpdate) {
+            super(delegate);
+            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
+        }
+        
+        @Override
+        public void updateMessage(Message message) throws IOException {
+            if(throwExceptionOnUpdate) {
+                if(numBeforeException > 0) {
+                    numBeforeException--;
+                    super.updateMessage(message);
+                } else {
+                    //A message that has never been delivered will hit this exception
+                    throw new IOException("Hit our simulated exception writing the update to disk");
+                }
+            } else {
+                super.updateMessage(message);
+            }
+        }
+    }
+}
\ No newline at end of file