You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/09 15:58:49 UTC

qpid-broker-j git commit: QPID-7811: Ensure that last stored message is recovered by AsynchronousMessageStoreRecoverer

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 32733fc22 -> cf1af38c6


QPID-7811: Ensure that last stored message is recovered by AsynchronousMessageStoreRecoverer


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cf1af38c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cf1af38c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cf1af38c

Branch: refs/heads/master
Commit: cf1af38c61727f6608c0505134137cbcd067ca6e
Parents: 32733fc
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jun 9 16:58:23 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jun 9 16:58:40 2017 +0100

----------------------------------------------------------------------
 .../AsynchronousMessageStoreRecoverer.java      |  2 +-
 .../AsynchronousMessageStoreRecovererTest.java  | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf1af38c/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 31c20f6..777674b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -200,7 +200,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
                 public boolean handle(final StoredMessage<?> storedMessage)
                 {
                     long messageNumber = storedMessage.getMessageNumber();
-                    if ( _continueRecovery.get() && messageNumber < _maxMessageId - 1)
+                    if ( _continueRecovery.get() && messageNumber < _maxMessageId)
                     {
                         if (!_recoveredMessages.containsKey(messageNumber))
                         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf1af38c/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
index c5a8a24..f8a0589 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
@@ -21,7 +21,9 @@
 package org.apache.qpid.server.virtualhost;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -35,8 +37,10 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.mockito.ArgumentMatcher;
 
 import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
@@ -112,7 +116,8 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase
         when(_store.newTransaction()).thenReturn(mock(Transaction.class));
 
         final List<StoredMessage<?>> testMessages = new ArrayList<>();
-        testMessages.add(createTestMessage(1L));
+        StoredMessage<?> storedMessage = createTestMessage(1L);
+        testMessages.add(storedMessage);
         StoredMessage newMessage = createTestMessage(4L);
         testMessages.add(newMessage);
 
@@ -129,6 +134,19 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase
         assertNull(result.get());
 
         verify(newMessage, times(0)).remove();
+        verify(queue).recover(argThat(new ArgumentMatcher<ServerMessage>()
+        {
+            @Override
+            public boolean matches(final Object argument)
+            {
+                if (argument instanceof ServerMessage)
+                {
+                    ServerMessage serverMessage = (ServerMessage)argument;
+                    return serverMessage.getMessageNumber() == storedMessage.getMessageNumber();
+                }
+                return false;
+            }
+        }), same(messageEnqueueRecord));
     }
 
     private StoredMessage<?> createTestMessage(final long messageNumber)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org