You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/08 21:04:02 UTC

[qpid-broker-j] 01/02: QPID-8305: [Broker-J][JDBC Message Store] Performance regression when increasing the number of queues linked to a topic

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit fccae34c184dd45b2a057d7bf6658b152c687e11
Author: overmeulen <ov...@murex.com>
AuthorDate: Mon Apr 29 13:59:43 2019 +0200

    QPID-8305: [Broker-J][JDBC Message Store] Performance regression when increasing the number of queues linked to a topic
---
 .../store/jdbc/AbstractJDBCMessageStore.java       | 73 ++++++++++++----------
 1 file changed, 39 insertions(+), 34 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index de8fef2..7c65499 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -31,7 +31,9 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -600,35 +602,34 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         return new JDBCTransaction();
     }
 
-    private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
+    private void enqueueMessages(ConnectionWrapper connWrapper, Map<Long, List<TransactionLogResource>> queuesPerMessage) throws StoreException
     {
         Connection conn = connWrapper.getConnection();
+        String sql = String.format("INSERT INTO %s (queue_id, message_id) values (?,?)", getQueueEntryTableName());
 
-
-        try
+        try (PreparedStatement stmt = conn.prepareStatement(sql))
         {
-            if (getLogger().isDebugEnabled())
+            for(Long messageId : queuesPerMessage.keySet())
             {
-                getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]",
-                                  messageId, queue.getName(), queue.getId(), conn);
-            }
-
-            try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getQueueEntryTableName()
-                                                                + " (queue_id, message_id) values (?,?)"))
-            {
-                stmt.setString(1, queue.getId().toString());
-                stmt.setLong(2, messageId);
-                stmt.executeUpdate();
+                for(TransactionLogResource queue : queuesPerMessage.get(messageId))
+                {
+                    if (getLogger().isDebugEnabled())
+                    {
+                        getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]",
+                                messageId, queue.getName(), queue.getId(), conn);
+                    }
+                    stmt.setString(1, queue.getId().toString());
+                    stmt.setLong(2, messageId);
+                    stmt.addBatch();
+                }
             }
-
+            stmt.executeBatch();
         }
         catch (SQLException e)
         {
-            getLogger().error("Failed to enqueue message {}", messageId, e);
-            throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
-                                     + " to database", e);
+            getLogger().error("Failed to enqueue messages", e);
+            throw new StoreException("Error writing enqueued messages to database", e);
         }
-
     }
 
     private void dequeueMessage(ConnectionWrapper connWrapper, final UUID queueId,
@@ -1135,6 +1136,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         private int _storeSizeIncrease;
         private final List<Runnable> _preCommitActions = new ArrayList<>();
         private final List<Runnable> _postCommitActions = new ArrayList<>();
+        private final Map<Long, List<TransactionLogResource>> _messagesToEnqueue = new HashMap<>();
 
         protected JDBCTransaction()
         {
@@ -1156,25 +1158,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             final StoredMessage storedMessage = message.getStoredMessage();
             if(storedMessage instanceof StoredJDBCMessage)
             {
-                _preCommitActions.add(new Runnable()
-                {
-                    @Override
-                    public void run()
+                _preCommitActions.add(() -> {
+                    try
                     {
-                        try
-                        {
-                            ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
-                            _storeSizeIncrease += storedMessage.getContentSize();
-                        }
-                        catch (SQLException e)
-                        {
-                            throw new StoreException("Exception on enqueuing message into message store" + _messageId,
-                                                     e);
-                        }
+                        ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+                        _storeSizeIncrease += storedMessage.getContentSize();
                     }
+                    catch (SQLException e)
+                    {
+                        throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+                    }
+                });
+            }
+            if(_messagesToEnqueue.isEmpty())
+            {
+                _preCommitActions.add(() -> {
+                    AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue);
+                    _messagesToEnqueue.clear();
                 });
             }
-            AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
+            List<TransactionLogResource> queues = _messagesToEnqueue.computeIfAbsent(message.getMessageNumber(), messageId -> new ArrayList<>());
+            queues.add(queue);
             return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber());
         }
 
@@ -1236,6 +1240,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             checkMessageStoreOpen();
             _preCommitActions.clear();
+            _messagesToEnqueue.clear();
             AbstractJDBCMessageStore.this.abortTran(_connWrapper);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org