You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/08 21:04:02 UTC
[qpid-broker-j] 01/02: QPID-8305: [Broker-J][JDBC Message Store]
Performance regression when increasing the number of queues linked to a
topic
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit fccae34c184dd45b2a057d7bf6658b152c687e11
Author: overmeulen <ov...@murex.com>
AuthorDate: Mon Apr 29 13:59:43 2019 +0200
QPID-8305: [Broker-J][JDBC Message Store] Performance regression when increasing the number of queues linked to a topic
---
.../store/jdbc/AbstractJDBCMessageStore.java | 73 ++++++++++++----------
1 file changed, 39 insertions(+), 34 deletions(-)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index de8fef2..7c65499 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -31,7 +31,9 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -600,35 +602,34 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
return new JDBCTransaction();
}
- private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
+ private void enqueueMessages(ConnectionWrapper connWrapper, Map<Long, List<TransactionLogResource>> queuesPerMessage) throws StoreException
{
Connection conn = connWrapper.getConnection();
+ String sql = String.format("INSERT INTO %s (queue_id, message_id) values (?,?)", getQueueEntryTableName());
-
- try
+ try (PreparedStatement stmt = conn.prepareStatement(sql))
{
- if (getLogger().isDebugEnabled())
+ for(Long messageId : queuesPerMessage.keySet())
{
- getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]",
- messageId, queue.getName(), queue.getId(), conn);
- }
-
- try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getQueueEntryTableName()
- + " (queue_id, message_id) values (?,?)"))
- {
- stmt.setString(1, queue.getId().toString());
- stmt.setLong(2, messageId);
- stmt.executeUpdate();
+ for(TransactionLogResource queue : queuesPerMessage.get(messageId))
+ {
+ if (getLogger().isDebugEnabled())
+ {
+ getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]",
+ messageId, queue.getName(), queue.getId(), conn);
+ }
+ stmt.setString(1, queue.getId().toString());
+ stmt.setLong(2, messageId);
+ stmt.addBatch();
+ }
}
-
+ stmt.executeBatch();
}
catch (SQLException e)
{
- getLogger().error("Failed to enqueue message {}", messageId, e);
- throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
- + " to database", e);
+ getLogger().error("Failed to enqueue messages", e);
+ throw new StoreException("Error writing enqueued messages to database", e);
}
-
}
private void dequeueMessage(ConnectionWrapper connWrapper, final UUID queueId,
@@ -1135,6 +1136,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private int _storeSizeIncrease;
private final List<Runnable> _preCommitActions = new ArrayList<>();
private final List<Runnable> _postCommitActions = new ArrayList<>();
+ private final Map<Long, List<TransactionLogResource>> _messagesToEnqueue = new HashMap<>();
protected JDBCTransaction()
{
@@ -1156,25 +1158,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
- _preCommitActions.add(new Runnable()
- {
- @Override
- public void run()
+ _preCommitActions.add(() -> {
+ try
{
- try
- {
- ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
- _storeSizeIncrease += storedMessage.getContentSize();
- }
- catch (SQLException e)
- {
- throw new StoreException("Exception on enqueuing message into message store" + _messageId,
- e);
- }
+ ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ _storeSizeIncrease += storedMessage.getContentSize();
}
+ catch (SQLException e)
+ {
+ throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+ }
+ });
+ }
+ if(_messagesToEnqueue.isEmpty())
+ {
+ _preCommitActions.add(() -> {
+ AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue);
+ _messagesToEnqueue.clear();
});
}
- AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
+ List<TransactionLogResource> queues = _messagesToEnqueue.computeIfAbsent(message.getMessageNumber(), messageId -> new ArrayList<>());
+ queues.add(queue);
return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber());
}
@@ -1236,6 +1240,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
checkMessageStoreOpen();
_preCommitActions.clear();
+ _messagesToEnqueue.clear();
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org