You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/09 10:52:50 UTC

qpid-broker-j git commit: QPID-7811: [Java Broker] Ensure that newly enqueued message is not deleted by AsynchronousMessageStoreRecoverer

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master f630b9ef9 -> 06dfe565f


QPID-7811: [Java Broker] Ensure that newly enqueued message is not deleted by AsynchronousMessageStoreRecoverer

* prevent deletion of newly enqueued message
* shutdown recoverer executor service after recovery is complete


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/06dfe565
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/06dfe565
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/06dfe565

Branch: refs/heads/master
Commit: 06dfe565ffe95aee8d0eee246d6c00c1fbf278d7
Parents: f630b9e
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jun 9 11:10:46 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jun 9 11:52:28 2017 +0100

----------------------------------------------------------------------
 .../AsynchronousMessageStoreRecoverer.java      |  12 +-
 .../AsynchronousMessageStoreRecovererTest.java  | 112 +++++++++++++++++++
 2 files changed, 120 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06dfe565/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 7b749c3..31c20f6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -199,13 +199,16 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
                 @Override
                 public boolean handle(final StoredMessage<?> storedMessage)
                 {
-
                     long messageNumber = storedMessage.getMessageNumber();
-                    if (!_recoveredMessages.containsKey(messageNumber))
+                    if ( _continueRecovery.get() && messageNumber < _maxMessageId - 1)
                     {
-                        messagesToDelete.add(storedMessage);
+                        if (!_recoveredMessages.containsKey(messageNumber))
+                        {
+                            messagesToDelete.add(storedMessage);
+                        }
+                        return true;
                     }
-                    return _continueRecovery.get() && messageNumber < _maxMessageId - 1;
+                    return false;
                 }
             });
             for(StoredMessage<?> storedMessage : messagesToDelete)
@@ -222,6 +225,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
             messagesToDelete.clear();
             _recoveredMessages.clear();
             _storeReader.close();
+            _queueRecoveryExecutor.shutdown();
         }
 
         private synchronized ServerMessage<?> getRecoveredMessage(final long messageId)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06dfe565/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
index e70fc67..c5a8a24 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
@@ -24,17 +24,30 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMessageMetaData;
+import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -89,4 +102,103 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase
         ListenableFuture<Void> result = recoverer.recover(_virtualHost);
         assertNull(result.get());
     }
+
+    public void testRecoveryWhenLastRecoveryMessageIsConsumedBeforeRecoveryCompleted() throws Exception
+    {
+        Queue<?> queue = mock(Queue.class);
+        when(queue.getId()).thenReturn(UUID.randomUUID());
+        when(_virtualHost.getChildren(eq(Queue.class))).thenReturn(Collections.singleton(queue));
+        when(_store.getNextMessageId()).thenReturn(3L);
+        when(_store.newTransaction()).thenReturn(mock(Transaction.class));
+
+        final List<StoredMessage<?>> testMessages = new ArrayList<>();
+        testMessages.add(createTestMessage(1L));
+        StoredMessage newMessage = createTestMessage(4L);
+        testMessages.add(newMessage);
+
+        final MessageEnqueueRecord messageEnqueueRecord = mock(MessageEnqueueRecord.class);
+        UUID id = queue.getId();
+        when(messageEnqueueRecord.getQueueId()).thenReturn(id);
+        when(messageEnqueueRecord.getMessageNumber()).thenReturn(1L);
+
+        MockStoreReader storeReader = new MockStoreReader(Collections.singletonList(messageEnqueueRecord), testMessages);
+        when(_store.newMessageStoreReader()).thenReturn(storeReader);
+
+        AsynchronousMessageStoreRecoverer recoverer = new AsynchronousMessageStoreRecoverer();
+        ListenableFuture<Void> result = recoverer.recover(_virtualHost);
+        assertNull(result.get());
+
+        verify(newMessage, times(0)).remove();
+    }
+
+    private StoredMessage<?> createTestMessage(final long messageNumber)
+    {
+        final StorableMessageMetaData metaData = new TestMessageMetaData(messageNumber, 0);
+        final StoredMessage storedMessage = mock(StoredMessage.class);
+        when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
+        when(storedMessage.getMetaData()).thenReturn(metaData);
+        return storedMessage;
+    }
+
+    private static class MockStoreReader implements MessageStore.MessageStoreReader
+    {
+        private final List<MessageEnqueueRecord> _messageEnqueueRecords;
+        private final List<StoredMessage<?>> _messages;
+
+        private MockStoreReader(final List<MessageEnqueueRecord> messageEnqueueRecords, List<StoredMessage<?>> messages)
+        {
+            _messageEnqueueRecords = messageEnqueueRecords;
+            _messages = messages;
+        }
+
+        @Override
+        public void visitMessages(final MessageHandler handler) throws StoreException
+        {
+            for (StoredMessage message: _messages)
+            {
+                handler.handle(message);
+            }
+        }
+
+        @Override
+        public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+        {
+            for(MessageEnqueueRecord record: _messageEnqueueRecords)
+            {
+                handler.handle(record);
+            }
+        }
+
+        @Override
+        public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler)
+                    throws StoreException
+        {
+            visitMessageInstances(handler);
+        }
+
+        @Override
+        public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+        {
+
+        }
+
+        @Override
+        public StoredMessage<?> getMessage(final long messageId)
+        {
+            for(StoredMessage<?> message: _messages)
+            {
+                if (message.getMessageNumber() == messageId)
+                {
+                    return message;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public void close()
+        {
+
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org