You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/03 21:59:37 UTC

[qpid-broker-j] branch master updated: QPID-8294: [Brojer-J][JDBC] Configure 'qpid.jdbcstore.inClauseMaxSize' as context variable

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/master by this push:
     new f435696  QPID-8294: [Brojer-J][JDBC] Configure 'qpid.jdbcstore.inClauseMaxSize' as context variable
f435696 is described below

commit f4356963518ddc7adf302a9e56586a1d92b2d580
Author: Oleksandr Rudyy <ol...@jpmorgan.com>
AuthorDate: Wed May 1 13:56:35 2019 +0100

    QPID-8294: [Brojer-J][JDBC] Configure 'qpid.jdbcstore.inClauseMaxSize' as context variable
    
    This closes #32
---
 .../store/jdbc/AbstractJDBCMessageStore.java       | 21 +++++++++++--
 .../server/store/jdbc/JDBCMessageStoreTest.java    | 36 ++++++++++++++++++++--
 2 files changed, 53 insertions(+), 4 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 2dc992f..bae485e 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -83,7 +83,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     private static final String XID_TABLE_NAME_SUFFIX = "QPID_XIDS";
     private static final String XID_ACTIONS_TABLE_NAME_SUFFIX = "QPID_XID_ACTIONS";
 
-    private static final int IN_CLAUSE_MAX_SIZE = Integer.getInteger("qpid.jdbcstore.inClauseMaxSize",1000);
+    private static final int IN_CLAUSE_MAX_SIZE_DEFAULT = 1000;
+    static final String IN_CLAUSE_MAX_SIZE = "qpid.jdbcstore.inClauseMaxSize";
 
     private static final int DB_VERSION = 8;
 
@@ -107,6 +108,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
     protected abstract void checkMessageStoreOpen();
     private ScheduledThreadPoolExecutor _executor;
+    private volatile int _inClauseMaxSize;
 
     public AbstractJDBCMessageStore()
     {
@@ -230,6 +232,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         });
         _executor.prestartAllCoreThreads();
 
+        _inClauseMaxSize = getContextValue(Integer.class, IN_CLAUSE_MAX_SIZE, IN_CLAUSE_MAX_SIZE_DEFAULT);
     }
 
     @Override
@@ -473,7 +476,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             {
                 try
                 {
-                    for (List<Long> boundMessageIds : Lists.partition(messageIds, IN_CLAUSE_MAX_SIZE))
+                    for (List<Long> boundMessageIds : Lists.partition(messageIds, _inClauseMaxSize))
                     {
                         removeMessagesFromDatabase(conn, boundMessageIds);
                     }
@@ -1934,6 +1937,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     }
 
 
+    private <T> T getContextValue(final Class<T> variableClass,
+                                  final String name,
+                                  final T defaultValue)
+    {
+        if (_parent.getContextKeys(false).contains(name))
+        {
+            return _parent.getContextValue(variableClass, name);
+        }
+        else
+        {
+            return defaultValue;
+        }
+    }
+
     private static class JDBCEnqueueRecord implements MessageEnqueueRecord
     {
         private final UUID _queueId;
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index c289b5e..a152337 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.store.jdbc;
 
+import static org.apache.qpid.server.store.jdbc.AbstractJDBCMessageStore.IN_CLAUSE_MAX_SIZE;
 import static org.apache.qpid.server.store.jdbc.TestJdbcUtils.assertTablesExistence;
 import static org.apache.qpid.server.store.jdbc.TestJdbcUtils.getTableNames;
 import static org.junit.Assert.assertEquals;
@@ -53,6 +54,7 @@ import org.mockito.Mockito;
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -200,13 +202,14 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
     }
 
     @Test
-    public void testRemoveMessages1000()
+    public void testRemoveMessagesWhenNumberOfMessagesEqualsInClauseMaxSize()
     {
         final String queueName = getTestName();
         final UUID transactionalLogId = UUID.randomUUID();
         final TransactionLogResource resource = mockTransactionLogResource(transactionalLogId, queueName);
-        final int numberOfMessages = 1000;
+        final int numberOfMessages = 10;
         final GenericJDBCMessageStore store = (GenericJDBCMessageStore) getStore();
+        reOpenStoreWithInClauseMaxSize(store, numberOfMessages);
 
         final List<MessageEnqueueRecord> records = enqueueMessages(store, resource, numberOfMessages);
         assertEquals(numberOfMessages, records.size());
@@ -223,6 +226,35 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         assertTrue(stored.isEmpty());
     }
 
+    @Test
+    public void testInClauseMaxSize() throws Exception
+    {
+        final GenericJDBCMessageStore store = spy((GenericJDBCMessageStore) getStore());
+        reOpenStoreWithInClauseMaxSize(store, 10);
+
+        store.removeMessages(LongStream.rangeClosed(1, 21L).boxed().collect(Collectors.toList()));
+
+        verify(store).removeMessagesFromDatabase(any(Connection.class),
+                                                 eq(LongStream.rangeClosed(1L, 10L)
+                                                              .boxed()
+                                                              .collect(Collectors.toList())));
+        verify(store).removeMessagesFromDatabase(any(Connection.class),
+                                                 eq(LongStream.rangeClosed(11L, 20L)
+                                                              .boxed()
+                                                              .collect(Collectors.toList())));
+        verify(store).removeMessagesFromDatabase(any(Connection.class), eq(Collections.singletonList(21L)));
+    }
+
+    private void reOpenStoreWithInClauseMaxSize(final GenericJDBCMessageStore store, final int inClauseMaxSize)
+    {
+        final ConfiguredObject<?> parent = getVirtualHost();
+        when(parent.getContextValue(Integer.class, IN_CLAUSE_MAX_SIZE)).thenReturn(inClauseMaxSize);
+        when(parent.getContextKeys(false)).thenReturn(Collections.singleton(IN_CLAUSE_MAX_SIZE));
+
+        store.closeMessageStore();
+        store.openMessageStore(parent);
+    }
+
     private List<MessageEnqueueRecord> enqueueMessages(final MessageStore store,
                                                        final TransactionLogResource resource,
                                                        final int numberOfMessages)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org