You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/01 00:39:14 UTC

[qpid-broker-j] 02/02: QPID-8303: [Broker-J][JDBC Message Store] Add test

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 6726fe7adff3fb0064c42c03cf84b79bd22117bd
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed May 1 01:36:56 2019 +0100

    QPID-8303: [Broker-J][JDBC Message Store] Add test
---
 .../server/store/jdbc/JDBCMessageStoreTest.java    | 57 +++++++++++++++++++++-
 1 file changed, 56 insertions(+), 1 deletion(-)

diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 4ec6e1b..d19c034 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -47,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -59,7 +61,6 @@ import org.apache.qpid.server.store.MessageStoreTestCase;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
-import org.mockito.Mockito;
 
 public class JDBCMessageStoreTest extends MessageStoreTestCase
 {
@@ -197,6 +198,60 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         verify(store).removeMessagesFromDatabase(any(Connection.class), eq(Collections.singletonList(2001L)));
     }
 
+    @Test
+    public void testRemoveMessages1000()
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+        final TransactionLogResource resource = mockTransactionLogResource(transactionalLogId, queueName);
+        final int numberOfMessages = 1000;
+        final GenericJDBCMessageStore store = (GenericJDBCMessageStore) getStore();
+
+        final List<MessageEnqueueRecord> records = enqueueMessages(store, resource, numberOfMessages);
+        assertEquals(numberOfMessages, records.size());
+        assertRecords(store, resource, records);
+
+        store.removeMessages(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toList()));
+
+        final List<MessageEnqueueRecord> stored = new ArrayList<>();
+        store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+            stored.add(r);
+            return true;
+        });
+
+        assertTrue(stored.isEmpty());
+    }
+
+    private List<MessageEnqueueRecord> enqueueMessages(final MessageStore store,
+                                                       final TransactionLogResource resource,
+                                                       final int numberOfMessages)
+    {
+        final Transaction transaction = store.newTransaction();
+        final String name = resource.getName();
+        final List<MessageEnqueueRecord> records = LongStream.rangeClosed(1, numberOfMessages)
+                                                             .boxed()
+                                                             .map(i -> {
+                                                                 final InternalMessage m =
+                                                                         addTestMessage(store, name, i + "");
+                                                                 return transaction.enqueueMessage(resource, m);
+                                                             }).collect(Collectors.toList());
+        transaction.commitTran();
+        return records;
+    }
+
+    private void assertRecords(final MessageStore store,
+                               final TransactionLogResource resource,
+                               final List<MessageEnqueueRecord> records)
+    {
+        final List<MessageEnqueueRecord> visited = new ArrayList<>();
+        store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+            visited.add(r);
+            return true;
+        });
+        assertEquals(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()),
+                     visited.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()));
+    }
+
     private InternalMessage addTestMessage(final MessageStore store,
                                            final String transactionalLogName,
                                            final String messageContent)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org