You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/07/26 00:37:22 UTC

svn commit: r1365832 - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/ broker/src/main/java/org/apache/qpid/server/store/derby...

Author: kwall
Date: Wed Jul 25 22:37:21 2012
New Revision: 1365832

URL: http://svn.apache.org/viewvc?rev=1365832&view=rev
Log:
QPID-4164: Prevent the erroneous re-storing of recovered messages during move/copyMessage management functions.

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1365832&r1=1365831&r2=1365832&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Wed Jul 25 22:37:21 2012
@@ -546,7 +546,7 @@ public abstract class AbstractBDBMessage
                 long messageId = LongBinding.entryToLong(key);
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
 
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
 
                 mrh.message(message);
 
@@ -1576,34 +1576,26 @@ public abstract class AbstractBDBMessage
     {
 
         private final long _messageId;
-        private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+        private final boolean _isRecovered;
 
         private StorableMessageMetaData _metaData;
-        private volatile SoftReference<byte[]> _dataRef;
+        private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+
         private byte[] _data;
+        private volatile SoftReference<byte[]> _dataRef;
 
         StoredBDBMessage(long messageId, StorableMessageMetaData metaData)
         {
-            this(messageId, metaData, true);
+            this(messageId, metaData, false);
         }
 
-
-        StoredBDBMessage(long messageId,
-                           StorableMessageMetaData metaData, boolean persist)
+        StoredBDBMessage(long messageId, StorableMessageMetaData metaData, boolean isRecovered)
         {
-            try
-            {
-                _messageId = messageId;
-                _metaData = metaData;
-
-                _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-
-            }
-            catch (DatabaseException e)
-            {
-                throw new RuntimeException(e);
-            }
+            _messageId = messageId;
+            _isRecovered = isRecovered;
 
+            _metaData = metaData;
+            _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
         }
 
         public StorableMessageMetaData getMetaData()
@@ -1693,8 +1685,7 @@ public abstract class AbstractBDBMessage
 
         synchronized void store(com.sleepycat.je.Transaction txn)
         {
-
-            if(unstored())
+            if (!stored())
             {
                 try
                 {
@@ -1724,14 +1715,9 @@ public abstract class AbstractBDBMessage
             }
         }
 
-        private boolean unstored()
-        {
-            return _metaData != null;
-        }
-
         public synchronized StoreFuture flushToStore()
         {
-            if(unstored())
+            if(!stored())
             {
                 com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
                 store(txn);
@@ -1755,6 +1741,11 @@ public abstract class AbstractBDBMessage
                 throw new RuntimeException(e);
             }
         }
+
+        private boolean stored()
+        {
+            return _metaData == null || _isRecovered;
+        }
     }
 
     private class BDBTransaction implements org.apache.qpid.server.store.Transaction

Modified: qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java?rev=1365832&r1=1365831&r2=1365832&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java Wed Jul 25 22:37:21 2012
@@ -41,10 +41,12 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.management.Notification;
 import javax.management.NotificationListener;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
+import javax.naming.NamingException;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -83,7 +85,6 @@ public class QueueManagementTest extends
     private ManagedQueue _managedSourceQueue;
     private ManagedQueue _managedDestinationQueue;
 
-
     public void setUp() throws Exception
     {
         _jmxUtils = new JMXTestUtils(this);
@@ -93,10 +94,8 @@ public class QueueManagementTest extends
         _sourceQueueName = getTestQueueName() + "_src";
         _destinationQueueName = getTestQueueName() + "_dest";
 
-        _connection = getConnection();
-        _connection.start();
+        createConnectionAndSession();
 
-        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
         _sourceQueue = _session.createQueue(_sourceQueueName);
         _destinationQueue = _session.createQueue(_destinationQueueName);
         createQueueOnBroker(_sourceQueue);
@@ -104,8 +103,7 @@ public class QueueManagementTest extends
 
         _jmxUtils.open();
 
-        _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName);
-        _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName);
+        createManagementInterfacesForQueues();
     }
 
     public void tearDown() throws Exception
@@ -498,6 +496,70 @@ public class QueueManagementTest extends
         assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue());
     }
 
+    /**
+     * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface.
+     */
+    public void testMoveMessageBetweenQueuesWithBrokerRestart() throws Exception
+    {
+        final int numberOfMessagesToSend = 1;
+
+        sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+        syncSession(_session);
+        assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
+
+        restartBroker();
+
+        createManagementInterfacesForQueues();
+        createConnectionAndSession();
+
+        List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+        // Move messages to destination
+        long messageId = amqMessagesIds.get(0);
+        _managedSourceQueue.moveMessages(messageId, messageId, _destinationQueueName);
+
+        assertEquals("Unexpected queue depth on destination queue after move", 1, _managedDestinationQueue.getMessageCount().intValue());
+        assertEquals("Unexpected queue depth on source queue after move", 0, _managedSourceQueue.getMessageCount().intValue());
+
+        assertMessageIndicesOn(_destinationQueue, 0);
+    }
+
+    /**
+     * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface.
+     */
+    public void testCopyMessageBetweenQueuesWithBrokerRestart() throws Exception
+    {
+        final int numberOfMessagesToSend = 1;
+
+        sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+        syncSession(_session);
+        assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
+
+        restartBroker();
+
+        createManagementInterfacesForQueues();
+        createConnectionAndSession();
+
+        List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+        // Move messages to destination
+        long messageId = amqMessagesIds.get(0);
+        _managedSourceQueue.copyMessages(messageId, messageId, _destinationQueueName);
+
+        assertEquals("Unexpected queue depth on destination queue after copy", 1, _managedDestinationQueue.getMessageCount().intValue());
+        assertEquals("Unexpected queue depth on source queue after copy", 1, _managedSourceQueue.getMessageCount().intValue());
+
+        assertMessageIndicesOn(_destinationQueue, 0);
+    }
+
+    @Override
+    public Message createNextMessage(Session session, int messageNumber) throws JMSException
+    {
+        Message message = session.createTextMessage(getContentForMessageNumber(messageNumber));
+        message.setIntProperty(INDEX, messageNumber);
+        return message;
+    }
+
     private void startAsyncConsumerOn(Destination queue, Connection asyncConnection,
             final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception
     {
@@ -521,9 +583,10 @@ public class QueueManagementTest extends
 
         for (int i : expectedIndices)
         {
-            Message message = consumer.receive(1000);
+            TextMessage message = (TextMessage)consumer.receive(1000);
             assertNotNull("Expected message with index " + i, message);
             assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX));
+            assertEquals("Expected message content", getContentForMessageNumber(i), message.getText());
         }
 
         assertNull("Unexpected message encountered", consumer.receive(1000));
@@ -574,6 +637,25 @@ public class QueueManagementTest extends
         ((AMQSession<?,?>)session).sync();
     }
 
+    private void createConnectionAndSession() throws JMSException,
+            NamingException
+    {
+        _connection = getConnection();
+        _connection.start();
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    private void createManagementInterfacesForQueues()
+    {
+        _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName);
+        _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName);
+    }
+
+    private String getContentForMessageNumber(int msgCount)
+    {
+        return "Message count " + msgCount;
+    }
+
     private final class RecordingNotificationListener implements NotificationListener
     {
         private final CountDownLatch _notificationReceivedLatch;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1365832&r1=1365831&r2=1365832&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Wed Jul 25 22:37:21 2012
@@ -1575,7 +1575,7 @@ public class DerbyMessageStore implement
                         buf = buf.slice();
                         MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
                         StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
-                        StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+                        StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
                         messageHandler.message(message);
                     }
 
@@ -2037,6 +2037,8 @@ public class DerbyMessageStore implement
     {
 
         private final long _messageId;
+        private final boolean _isRecovered;
+
         private StorableMessageMetaData _metaData;
         private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
         private byte[] _data;
@@ -2045,21 +2047,18 @@ public class DerbyMessageStore implement
 
         StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
         {
-            this(messageId, metaData, true);
+            this(messageId, metaData, false);
         }
 
 
         StoredDerbyMessage(long messageId,
-                           StorableMessageMetaData metaData, boolean persist)
+                           StorableMessageMetaData metaData, boolean isRecovered)
         {
             _messageId = messageId;
-
+            _isRecovered = isRecovered;
 
             _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-            if(persist)
-            {
-                _metaData = metaData;
-            }
+            _metaData = metaData;
         }
 
         @Override
@@ -2140,16 +2139,16 @@ public class DerbyMessageStore implement
         @Override
         public synchronized StoreFuture flushToStore()
         {
+            Connection conn = null;
             try
             {
-                if(_metaData != null)
+                if(!stored())
                 {
-                    Connection conn = newConnection();
+                    conn = newConnection();
 
                     store(conn);
 
                     conn.commit();
-                    conn.close();
                     storedSizeChange(getMetaData().getContentSize());
                 }
             }
@@ -2161,12 +2160,24 @@ public class DerbyMessageStore implement
                 }
                 throw new RuntimeException(e);
             }
+            finally
+            {
+                closeConnection(conn);
+            }
             return StoreFuture.IMMEDIATE_FUTURE;
         }
 
+        @Override
+        public void remove()
+        {
+            int delta = getMetaData().getContentSize();
+            DerbyMessageStore.this.removeMessage(_messageId);
+            storedSizeChange(-delta);
+        }
+
         private synchronized void store(final Connection conn) throws SQLException
         {
-            if(_metaData != null)
+            if (!stored())
             {
                 try
                 {
@@ -2179,20 +2190,17 @@ public class DerbyMessageStore implement
                     _metaData = null;
                     _data = null;
                 }
-            }
 
-            if(_logger.isDebugEnabled())
-            {
-                _logger.debug("Storing message " + _messageId + " to store");
+                if(_logger.isDebugEnabled())
+                {
+                    _logger.debug("Storing message " + _messageId + " to store");
+                }
             }
         }
 
-        @Override
-        public void remove()
+        private boolean stored()
         {
-            int delta = getMetaData().getContentSize();
-            DerbyMessageStore.this.removeMessage(_messageId);
-            storedSizeChange(-delta);
+            return _metaData == null || _isRecovered;
         }
     }
 

Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1365832&r1=1365831&r2=1365832&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Wed Jul 25 22:37:21 2012
@@ -45,3 +45,5 @@ org.apache.qpid.server.store.DurableConf
 
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testAlternateExchangeSurvivesRestart
 org.apache.qpid.systest.management.jmx.QueueManagementTest#testQueueDescriptionSurvivesRestart
+org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetweenQueuesWithBrokerRestart
+org.apache.qpid.systest.management.jmx.QueueManagementTest#testCopyMessageBetweenQueuesWithBrokerRestart



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org