You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/09 10:52:50 UTC
qpid-broker-j git commit: QPID-7811: [Java Broker] Ensure that newly
enqueued message is not deleted by AsynchronousMessageStoreRecoverer
Repository: qpid-broker-j
Updated Branches:
refs/heads/master f630b9ef9 -> 06dfe565f
QPID-7811: [Java Broker] Ensure that newly enqueued message is not deleted by AsynchronousMessageStoreRecoverer
* prevent deletion of newly enqueued message
* shutdown recoverer executor service after recovery is complete
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/06dfe565
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/06dfe565
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/06dfe565
Branch: refs/heads/master
Commit: 06dfe565ffe95aee8d0eee246d6c00c1fbf278d7
Parents: f630b9e
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jun 9 11:10:46 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jun 9 11:52:28 2017 +0100
----------------------------------------------------------------------
.../AsynchronousMessageStoreRecoverer.java | 12 +-
.../AsynchronousMessageStoreRecovererTest.java | 112 +++++++++++++++++++
2 files changed, 120 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06dfe565/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 7b749c3..31c20f6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -199,13 +199,16 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
@Override
public boolean handle(final StoredMessage<?> storedMessage)
{
-
long messageNumber = storedMessage.getMessageNumber();
- if (!_recoveredMessages.containsKey(messageNumber))
+ if ( _continueRecovery.get() && messageNumber < _maxMessageId - 1)
{
- messagesToDelete.add(storedMessage);
+ if (!_recoveredMessages.containsKey(messageNumber))
+ {
+ messagesToDelete.add(storedMessage);
+ }
+ return true;
}
- return _continueRecovery.get() && messageNumber < _maxMessageId - 1;
+ return false;
}
});
for(StoredMessage<?> storedMessage : messagesToDelete)
@@ -222,6 +225,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
messagesToDelete.clear();
_recoveredMessages.clear();
_storeReader.close();
+ _queueRecoveryExecutor.shutdown();
}
private synchronized ServerMessage<?> getRecoveredMessage(final long messageId)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06dfe565/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
index e70fc67..c5a8a24 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
@@ -24,17 +24,30 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMessageMetaData;
+import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -89,4 +102,103 @@ public class AsynchronousMessageStoreRecovererTest extends QpidTestCase
ListenableFuture<Void> result = recoverer.recover(_virtualHost);
assertNull(result.get());
}
+
+ public void testRecoveryWhenLastRecoveryMessageIsConsumedBeforeRecoveryCompleted() throws Exception
+ {
+ Queue<?> queue = mock(Queue.class);
+ when(queue.getId()).thenReturn(UUID.randomUUID());
+ when(_virtualHost.getChildren(eq(Queue.class))).thenReturn(Collections.singleton(queue));
+ when(_store.getNextMessageId()).thenReturn(3L);
+ when(_store.newTransaction()).thenReturn(mock(Transaction.class));
+
+ final List<StoredMessage<?>> testMessages = new ArrayList<>();
+ testMessages.add(createTestMessage(1L));
+ StoredMessage newMessage = createTestMessage(4L);
+ testMessages.add(newMessage);
+
+ final MessageEnqueueRecord messageEnqueueRecord = mock(MessageEnqueueRecord.class);
+ UUID id = queue.getId();
+ when(messageEnqueueRecord.getQueueId()).thenReturn(id);
+ when(messageEnqueueRecord.getMessageNumber()).thenReturn(1L);
+
+ MockStoreReader storeReader = new MockStoreReader(Collections.singletonList(messageEnqueueRecord), testMessages);
+ when(_store.newMessageStoreReader()).thenReturn(storeReader);
+
+ AsynchronousMessageStoreRecoverer recoverer = new AsynchronousMessageStoreRecoverer();
+ ListenableFuture<Void> result = recoverer.recover(_virtualHost);
+ assertNull(result.get());
+
+ verify(newMessage, times(0)).remove();
+ }
+
+ private StoredMessage<?> createTestMessage(final long messageNumber)
+ {
+ final StorableMessageMetaData metaData = new TestMessageMetaData(messageNumber, 0);
+ final StoredMessage storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
+ when(storedMessage.getMetaData()).thenReturn(metaData);
+ return storedMessage;
+ }
+
+ private static class MockStoreReader implements MessageStore.MessageStoreReader
+ {
+ private final List<MessageEnqueueRecord> _messageEnqueueRecords;
+ private final List<StoredMessage<?>> _messages;
+
+ private MockStoreReader(final List<MessageEnqueueRecord> messageEnqueueRecords, List<StoredMessage<?>> messages)
+ {
+ _messageEnqueueRecords = messageEnqueueRecords;
+ _messages = messages;
+ }
+
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ for (StoredMessage message: _messages)
+ {
+ handler.handle(message);
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ for(MessageEnqueueRecord record: _messageEnqueueRecords)
+ {
+ handler.handle(record);
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler)
+ throws StoreException
+ {
+ visitMessageInstances(handler);
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+
+ }
+
+ @Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ for(StoredMessage<?> message: _messages)
+ {
+ if (message.getMessageNumber() == messageId)
+ {
+ return message;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org