You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/03 21:59:37 UTC
[qpid-broker-j] branch master updated: QPID-8294: [Brojer-J][JDBC]
Configure 'qpid.jdbcstore.inClauseMaxSize' as context variable
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/master by this push:
new f435696 QPID-8294: [Brojer-J][JDBC] Configure 'qpid.jdbcstore.inClauseMaxSize' as context variable
f435696 is described below
commit f4356963518ddc7adf302a9e56586a1d92b2d580
Author: Oleksandr Rudyy <ol...@jpmorgan.com>
AuthorDate: Wed May 1 13:56:35 2019 +0100
QPID-8294: [Brojer-J][JDBC] Configure 'qpid.jdbcstore.inClauseMaxSize' as context variable
This closes #32
---
.../store/jdbc/AbstractJDBCMessageStore.java | 21 +++++++++++--
.../server/store/jdbc/JDBCMessageStoreTest.java | 36 ++++++++++++++++++++--
2 files changed, 53 insertions(+), 4 deletions(-)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 2dc992f..bae485e 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -83,7 +83,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private static final String XID_TABLE_NAME_SUFFIX = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME_SUFFIX = "QPID_XID_ACTIONS";
- private static final int IN_CLAUSE_MAX_SIZE = Integer.getInteger("qpid.jdbcstore.inClauseMaxSize",1000);
+ private static final int IN_CLAUSE_MAX_SIZE_DEFAULT = 1000;
+ static final String IN_CLAUSE_MAX_SIZE = "qpid.jdbcstore.inClauseMaxSize";
private static final int DB_VERSION = 8;
@@ -107,6 +108,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
protected abstract void checkMessageStoreOpen();
private ScheduledThreadPoolExecutor _executor;
+ private volatile int _inClauseMaxSize;
public AbstractJDBCMessageStore()
{
@@ -230,6 +232,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
});
_executor.prestartAllCoreThreads();
+ _inClauseMaxSize = getContextValue(Integer.class, IN_CLAUSE_MAX_SIZE, IN_CLAUSE_MAX_SIZE_DEFAULT);
}
@Override
@@ -473,7 +476,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
try
{
- for (List<Long> boundMessageIds : Lists.partition(messageIds, IN_CLAUSE_MAX_SIZE))
+ for (List<Long> boundMessageIds : Lists.partition(messageIds, _inClauseMaxSize))
{
removeMessagesFromDatabase(conn, boundMessageIds);
}
@@ -1934,6 +1937,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
+ private <T> T getContextValue(final Class<T> variableClass,
+ final String name,
+ final T defaultValue)
+ {
+ if (_parent.getContextKeys(false).contains(name))
+ {
+ return _parent.getContextValue(variableClass, name);
+ }
+ else
+ {
+ return defaultValue;
+ }
+ }
+
private static class JDBCEnqueueRecord implements MessageEnqueueRecord
{
private final UUID _queueId;
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index c289b5e..a152337 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store.jdbc;
+import static org.apache.qpid.server.store.jdbc.AbstractJDBCMessageStore.IN_CLAUSE_MAX_SIZE;
import static org.apache.qpid.server.store.jdbc.TestJdbcUtils.assertTablesExistence;
import static org.apache.qpid.server.store.jdbc.TestJdbcUtils.getTableNames;
import static org.junit.Assert.assertEquals;
@@ -53,6 +54,7 @@ import org.mockito.Mockito;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -200,13 +202,14 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
}
@Test
- public void testRemoveMessages1000()
+ public void testRemoveMessagesWhenNumberOfMessagesEqualsInClauseMaxSize()
{
final String queueName = getTestName();
final UUID transactionalLogId = UUID.randomUUID();
final TransactionLogResource resource = mockTransactionLogResource(transactionalLogId, queueName);
- final int numberOfMessages = 1000;
+ final int numberOfMessages = 10;
final GenericJDBCMessageStore store = (GenericJDBCMessageStore) getStore();
+ reOpenStoreWithInClauseMaxSize(store, numberOfMessages);
final List<MessageEnqueueRecord> records = enqueueMessages(store, resource, numberOfMessages);
assertEquals(numberOfMessages, records.size());
@@ -223,6 +226,35 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
assertTrue(stored.isEmpty());
}
+ @Test
+ public void testInClauseMaxSize() throws Exception
+ {
+ final GenericJDBCMessageStore store = spy((GenericJDBCMessageStore) getStore());
+ reOpenStoreWithInClauseMaxSize(store, 10);
+
+ store.removeMessages(LongStream.rangeClosed(1, 21L).boxed().collect(Collectors.toList()));
+
+ verify(store).removeMessagesFromDatabase(any(Connection.class),
+ eq(LongStream.rangeClosed(1L, 10L)
+ .boxed()
+ .collect(Collectors.toList())));
+ verify(store).removeMessagesFromDatabase(any(Connection.class),
+ eq(LongStream.rangeClosed(11L, 20L)
+ .boxed()
+ .collect(Collectors.toList())));
+ verify(store).removeMessagesFromDatabase(any(Connection.class), eq(Collections.singletonList(21L)));
+ }
+
+ private void reOpenStoreWithInClauseMaxSize(final GenericJDBCMessageStore store, final int inClauseMaxSize)
+ {
+ final ConfiguredObject<?> parent = getVirtualHost();
+ when(parent.getContextValue(Integer.class, IN_CLAUSE_MAX_SIZE)).thenReturn(inClauseMaxSize);
+ when(parent.getContextKeys(false)).thenReturn(Collections.singleton(IN_CLAUSE_MAX_SIZE));
+
+ store.closeMessageStore();
+ store.openMessageStore(parent);
+ }
+
private List<MessageEnqueueRecord> enqueueMessages(final MessageStore store,
final TransactionLogResource resource,
final int numberOfMessages)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org