You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/01 00:39:12 UTC

[qpid-broker-j] branch master updated (f521464 -> 6726fe7)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from f521464  QPID-8299: [Broker-J][JDBC Config Store] Possibility to customize the connection provider for the config of the Broker
     new 3d5bacb  QPID-8303: [Broker-J][JDBC Message Store] Batch delete fails when deleting exactly 1000 messages
     new 6726fe7  QPID-8303: [Broker-J][JDBC Message Store] Add test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/jdbc/AbstractJDBCMessageStore.java       |  8 +--
 .../server/store/jdbc/JDBCMessageStoreTest.java    | 64 +++++++++++++++++++++-
 2 files changed, 65 insertions(+), 7 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/02: QPID-8303: [Broker-J][JDBC Message Store] Add test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 6726fe7adff3fb0064c42c03cf84b79bd22117bd
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed May 1 01:36:56 2019 +0100

    QPID-8303: [Broker-J][JDBC Message Store] Add test
---
 .../server/store/jdbc/JDBCMessageStoreTest.java    | 57 +++++++++++++++++++++-
 1 file changed, 56 insertions(+), 1 deletion(-)

diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 4ec6e1b..d19c034 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -47,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -59,7 +61,6 @@ import org.apache.qpid.server.store.MessageStoreTestCase;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
-import org.mockito.Mockito;
 
 public class JDBCMessageStoreTest extends MessageStoreTestCase
 {
@@ -197,6 +198,60 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         verify(store).removeMessagesFromDatabase(any(Connection.class), eq(Collections.singletonList(2001L)));
     }
 
+    @Test
+    public void testRemoveMessages1000()
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+        final TransactionLogResource resource = mockTransactionLogResource(transactionalLogId, queueName);
+        final int numberOfMessages = 1000;
+        final GenericJDBCMessageStore store = (GenericJDBCMessageStore) getStore();
+
+        final List<MessageEnqueueRecord> records = enqueueMessages(store, resource, numberOfMessages);
+        assertEquals(numberOfMessages, records.size());
+        assertRecords(store, resource, records);
+
+        store.removeMessages(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toList()));
+
+        final List<MessageEnqueueRecord> stored = new ArrayList<>();
+        store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+            stored.add(r);
+            return true;
+        });
+
+        assertTrue(stored.isEmpty());
+    }
+
+    private List<MessageEnqueueRecord> enqueueMessages(final MessageStore store,
+                                                       final TransactionLogResource resource,
+                                                       final int numberOfMessages)
+    {
+        final Transaction transaction = store.newTransaction();
+        final String name = resource.getName();
+        final List<MessageEnqueueRecord> records = LongStream.rangeClosed(1, numberOfMessages)
+                                                             .boxed()
+                                                             .map(i -> {
+                                                                 final InternalMessage m =
+                                                                         addTestMessage(store, name, i + "");
+                                                                 return transaction.enqueueMessage(resource, m);
+                                                             }).collect(Collectors.toList());
+        transaction.commitTran();
+        return records;
+    }
+
+    private void assertRecords(final MessageStore store,
+                               final TransactionLogResource resource,
+                               final List<MessageEnqueueRecord> records)
+    {
+        final List<MessageEnqueueRecord> visited = new ArrayList<>();
+        store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+            visited.add(r);
+            return true;
+        });
+        assertEquals(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()),
+                     visited.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()));
+    }
+
     private InternalMessage addTestMessage(final MessageStore store,
                                            final String transactionalLogName,
                                            final String messageContent)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/02: QPID-8303: [Broker-J][JDBC Message Store] Batch delete fails when deleting exactly 1000 messages

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 3d5bacb6698e501c86083870edc2f9d86caa427b
Author: overmeulen <ov...@murex.com>
AuthorDate: Mon Apr 29 14:41:17 2019 +0200

    QPID-8303: [Broker-J][JDBC Message Store] Batch delete fails when deleting exactly 1000 messages
    
    This closes #28
---
 .../apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java   | 8 ++------
 .../org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java   | 7 +++++++
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index d7a391b..2dc992f 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -472,13 +473,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             {
                 try
                 {
-                    for (int i = 0; i <= messageIds.size() / IN_CLAUSE_MAX_SIZE; i++)
+                    for (List<Long> boundMessageIds : Lists.partition(messageIds, IN_CLAUSE_MAX_SIZE))
                     {
-                        List<Long> boundMessageIds = messageIds.stream()
-                                                               .skip(i * IN_CLAUSE_MAX_SIZE)
-                                                               .limit(IN_CLAUSE_MAX_SIZE)
-                                                               .collect(Collectors.toList());
-
                         removeMessagesFromDatabase(conn, boundMessageIds);
                     }
                 }
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 84b93a2..4ec6e1b 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -36,6 +36,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -183,6 +184,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         GenericJDBCMessageStore store = spy((GenericJDBCMessageStore) getStore());
         when(store.newConnection()).thenReturn(mock(Connection.class, Mockito.RETURNS_MOCKS));
 
+        store.removeMessages(LongStream.rangeClosed(1,1000).boxed().collect(Collectors.toList()));
+
+        verify(store).removeMessagesFromDatabase(any(Connection.class), any(List.class));
+
+        Mockito.reset(store);
+
         store.removeMessages(LongStream.rangeClosed(1,2001).boxed().collect(Collectors.toList()));
 
         verify(store).removeMessagesFromDatabase(any(Connection.class), eq(LongStream.rangeClosed(1,1000).boxed().collect(Collectors.toList())));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org