You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/01 01:17:17 UTC

[qpid-broker-j] branch 7.1.x updated (d4286ef -> 575388c)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from d4286ef  QPID-8299: [Broker-J][JDBC Config Store] Possibility to customize the connection provider for the config of the Broker
     new c1cbe33  QPID-8303: [Broker-J][JDBC Message Store] Add test
     new 9214901  QPID-8303: [Broker-J][JDBC Message Store] Batch delete fails when deleting exactly 1000 messages
     new 575388c  QPID-8303: [Broker-J][JDBC Message Store] Fix test

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/jdbc/AbstractJDBCMessageStore.java       |  8 +--
 .../server/store/jdbc/JDBCMessageStoreTest.java    | 65 +++++++++++++++++++++-
 2 files changed, 66 insertions(+), 7 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 03/03: QPID-8303: [Broker-J][JDBC Message Store] Fix test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 575388cf1f39d733c0e5ba0b3311638b112ff806
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed May 1 01:58:12 2019 +0100

    QPID-8303: [Broker-J][JDBC Message Store] Fix test
    
    (cherry picked from commit afd1dc83251254c82340d89b389b69bc17edd373)
---
 .../org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index d19c034..c289b5e 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -58,6 +58,7 @@ import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreTestCase;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
@@ -213,9 +214,9 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
 
         store.removeMessages(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toList()));
 
-        final List<MessageEnqueueRecord> stored = new ArrayList<>();
-        store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
-            stored.add(r);
+        final List<StoredMessage> stored = new ArrayList<>();
+        store.newMessageStoreReader().visitMessages(m-> {
+            stored.add(m);
             return true;
         });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/03: QPID-8303: [Broker-J][JDBC Message Store] Batch delete fails when deleting exactly 1000 messages

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 9214901fe41d8ff8138477031b7811d7d3bb4963
Author: overmeulen <ov...@murex.com>
AuthorDate: Mon Apr 29 14:41:17 2019 +0200

    QPID-8303: [Broker-J][JDBC Message Store] Batch delete fails when deleting exactly 1000 messages
    
    This closes #28
    
    (cherry picked from commit 3d5bacb6698e501c86083870edc2f9d86caa427b)
---
 .../apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java   | 8 ++------
 .../org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java   | 7 +++++++
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index d7a391b..2dc992f 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -472,13 +473,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             {
                 try
                 {
-                    for (int i = 0; i <= messageIds.size() / IN_CLAUSE_MAX_SIZE; i++)
+                    for (List<Long> boundMessageIds : Lists.partition(messageIds, IN_CLAUSE_MAX_SIZE))
                     {
-                        List<Long> boundMessageIds = messageIds.stream()
-                                                               .skip(i * IN_CLAUSE_MAX_SIZE)
-                                                               .limit(IN_CLAUSE_MAX_SIZE)
-                                                               .collect(Collectors.toList());
-
                         removeMessagesFromDatabase(conn, boundMessageIds);
                     }
                 }
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index f8c016d..d19c034 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -37,6 +37,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -184,6 +185,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         GenericJDBCMessageStore store = spy((GenericJDBCMessageStore) getStore());
         when(store.newConnection()).thenReturn(mock(Connection.class, Mockito.RETURNS_MOCKS));
 
+        store.removeMessages(LongStream.rangeClosed(1,1000).boxed().collect(Collectors.toList()));
+
+        verify(store).removeMessagesFromDatabase(any(Connection.class), any(List.class));
+
+        Mockito.reset(store);
+
         store.removeMessages(LongStream.rangeClosed(1,2001).boxed().collect(Collectors.toList()));
 
         verify(store).removeMessagesFromDatabase(any(Connection.class), eq(LongStream.rangeClosed(1,1000).boxed().collect(Collectors.toList())));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/03: QPID-8303: [Broker-J][JDBC Message Store] Add test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit c1cbe333157675208c807dd1bad885fa8e7bcce5
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed May 1 01:36:56 2019 +0100

    QPID-8303: [Broker-J][JDBC Message Store] Add test
    
    (cherry picked from commit 6726fe7adff3fb0064c42c03cf84b79bd22117bd)
---
 .../server/store/jdbc/JDBCMessageStoreTest.java    | 57 +++++++++++++++++++++-
 1 file changed, 56 insertions(+), 1 deletion(-)

diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 84b93a2..f8c016d 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
@@ -46,6 +47,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -58,7 +60,6 @@ import org.apache.qpid.server.store.MessageStoreTestCase;
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
-import org.mockito.Mockito;
 
 public class JDBCMessageStoreTest extends MessageStoreTestCase
 {
@@ -190,6 +191,60 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         verify(store).removeMessagesFromDatabase(any(Connection.class), eq(Collections.singletonList(2001L)));
     }
 
+    @Test
+    public void testRemoveMessages1000()
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+        final TransactionLogResource resource = mockTransactionLogResource(transactionalLogId, queueName);
+        final int numberOfMessages = 1000;
+        final GenericJDBCMessageStore store = (GenericJDBCMessageStore) getStore();
+
+        final List<MessageEnqueueRecord> records = enqueueMessages(store, resource, numberOfMessages);
+        assertEquals(numberOfMessages, records.size());
+        assertRecords(store, resource, records);
+
+        store.removeMessages(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toList()));
+
+        final List<MessageEnqueueRecord> stored = new ArrayList<>();
+        store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+            stored.add(r);
+            return true;
+        });
+
+        assertTrue(stored.isEmpty());
+    }
+
+    private List<MessageEnqueueRecord> enqueueMessages(final MessageStore store,
+                                                       final TransactionLogResource resource,
+                                                       final int numberOfMessages)
+    {
+        final Transaction transaction = store.newTransaction();
+        final String name = resource.getName();
+        final List<MessageEnqueueRecord> records = LongStream.rangeClosed(1, numberOfMessages)
+                                                             .boxed()
+                                                             .map(i -> {
+                                                                 final InternalMessage m =
+                                                                         addTestMessage(store, name, i + "");
+                                                                 return transaction.enqueueMessage(resource, m);
+                                                             }).collect(Collectors.toList());
+        transaction.commitTran();
+        return records;
+    }
+
+    private void assertRecords(final MessageStore store,
+                               final TransactionLogResource resource,
+                               final List<MessageEnqueueRecord> records)
+    {
+        final List<MessageEnqueueRecord> visited = new ArrayList<>();
+        store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+            visited.add(r);
+            return true;
+        });
+        assertEquals(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()),
+                     visited.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()));
+    }
+
     private InternalMessage addTestMessage(final MessageStore store,
                                            final String transactionalLogName,
                                            final String messageContent)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org