You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/09 15:58:49 UTC
qpid-broker-j git commit: QPID-7811: Ensure that last stored message
is recovered by AsynchronousMessageStoreRecoverer
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 32733fc22 -> cf1af38c6
QPID-7811: Ensure that last stored message is recovered by AsynchronousMessageStoreRecoverer
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cf1af38c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cf1af38c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cf1af38c
Branch: refs/heads/master
Commit: cf1af38c61727f6608c0505134137cbcd067ca6e
Parents: 32733fc
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jun 9 16:58:23 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jun 9 16:58:40 2017 +0100
----------------------------------------------------------------------
.../AsynchronousMessageStoreRecoverer.java | 2 +-
.../AsynchronousMessageStoreRecovererTest.java | 20 +++++++++++++++++++-
2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf1af38c/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 31c20f6..777674b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -200,7 +200,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
public boolean handle(final StoredMessage<?> storedMessage)
{
long messageNumber = storedMessage.getMessageNumber();
- if ( _continueRecovery.get() && messageNumber < _maxMessageId - 1)
+ if ( _continueRecovery.get() && messageNumber < _maxMessageId)
{
if (!_recoveredMessages.containsKey(messageNumber))
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cf1af38c/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
index c5a8a24..f8a0589 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
@@ -21,7 +21,9 @@
package org.apache.qpid.server.virtualhost;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -35,8 +37,10 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.ListenableFuture;
+import org.mockito.ArgumentMatcher;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
@@ -112,7 +116,8 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase
when(_store.newTransaction()).thenReturn(mock(Transaction.class));
final List<StoredMessage<?>> testMessages = new ArrayList<>();
- testMessages.add(createTestMessage(1L));
+ StoredMessage<?> storedMessage = createTestMessage(1L);
+ testMessages.add(storedMessage);
StoredMessage newMessage = createTestMessage(4L);
testMessages.add(newMessage);
@@ -129,6 +134,19 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase
assertNull(result.get());
verify(newMessage, times(0)).remove();
+ verify(queue).recover(argThat(new ArgumentMatcher<ServerMessage>()
+ {
+ @Override
+ public boolean matches(final Object argument)
+ {
+ if (argument instanceof ServerMessage)
+ {
+ ServerMessage serverMessage = (ServerMessage)argument;
+ return serverMessage.getMessageNumber() == storedMessage.getMessageNumber();
+ }
+ return false;
+ }
+ }), same(messageEnqueueRecord));
}
private StoredMessage<?> createTestMessage(final long messageNumber)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org