You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/08 21:05:24 UTC

[qpid-broker-j] branch 7.1.x updated (13bcee3 -> d0692ad)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 13bcee3  QPID-8307:[Broker-J][Documentation] Document how to specify virtualhost initial configuration
     new c2e9c33  QPID-8305: [Broker-J][JDBC Message Store] Performance regression when increasing the number of queues linked to a topic
     new d0692ad  QPID-8305: [Broker-J][JDBC Message Store] Create the pre-commit action in the constructor of JBDCTransaction

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/jdbc/AbstractJDBCMessageStore.java       | 73 ++++++++++++----------
 1 file changed, 39 insertions(+), 34 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/02: QPID-8305: [Broker-J][JDBC Message Store] Create the pre-commit action in the constructor of JBDCTransaction

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit d0692ad993c6cf97f11ee3588ef9089026b9142d
Author: overmeulen <ov...@murex.com>
AuthorDate: Mon May 6 10:46:26 2019 +0200

    QPID-8305: [Broker-J][JDBC Message Store] Create the pre-commit action in the constructor of JBDCTransaction
    
    This closes #30
    
    (cherry picked from commit cadb26d58e8ace35dc544ab2402c911f6174b5fe)
---
 .../qpid/server/store/jdbc/AbstractJDBCMessageStore.java   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 7c65499..b2359b7 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -604,6 +604,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
     private void enqueueMessages(ConnectionWrapper connWrapper, Map<Long, List<TransactionLogResource>> queuesPerMessage) throws StoreException
     {
+        if (queuesPerMessage.isEmpty())
+        {
+            return;
+        }
         Connection conn = connWrapper.getConnection();
         String sql = String.format("INSERT INTO %s (queue_id, message_id) values (?,?)", getQueueEntryTableName());
 
@@ -1148,6 +1152,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             {
                 throw new StoreException(e);
             }
+
+            _preCommitActions.add(() -> AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue));
         }
 
         @Override
@@ -1170,13 +1176,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                     }
                 });
             }
-            if(_messagesToEnqueue.isEmpty())
-            {
-                _preCommitActions.add(() -> {
-                    AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue);
-                    _messagesToEnqueue.clear();
-                });
-            }
             List<TransactionLogResource> queues = _messagesToEnqueue.computeIfAbsent(message.getMessageNumber(), messageId -> new ArrayList<>());
             queues.add(queue);
             return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber());
@@ -1220,6 +1219,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                 action.run();
             }
             _preCommitActions.clear();
+            _messagesToEnqueue.clear();
         }
 
         private void doPostCommitActions()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/02: QPID-8305: [Broker-J][JDBC Message Store] Performance regression when increasing the number of queues linked to a topic

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit c2e9c3367be1bd0682f7ca44cff864ed93c5dda5
Author: overmeulen <ov...@murex.com>
AuthorDate: Mon Apr 29 13:59:43 2019 +0200

    QPID-8305: [Broker-J][JDBC Message Store] Performance regression when increasing the number of queues linked to a topic
    
    (cherry picked from commit fccae34c184dd45b2a057d7bf6658b152c687e11)
---
 .../store/jdbc/AbstractJDBCMessageStore.java       | 73 ++++++++++++----------
 1 file changed, 39 insertions(+), 34 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index de8fef2..7c65499 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -31,7 +31,9 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -600,35 +602,34 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         return new JDBCTransaction();
     }
 
-    private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
+    private void enqueueMessages(ConnectionWrapper connWrapper, Map<Long, List<TransactionLogResource>> queuesPerMessage) throws StoreException
     {
         Connection conn = connWrapper.getConnection();
+        String sql = String.format("INSERT INTO %s (queue_id, message_id) values (?,?)", getQueueEntryTableName());
 
-
-        try
+        try (PreparedStatement stmt = conn.prepareStatement(sql))
         {
-            if (getLogger().isDebugEnabled())
+            for(Long messageId : queuesPerMessage.keySet())
             {
-                getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]",
-                                  messageId, queue.getName(), queue.getId(), conn);
-            }
-
-            try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getQueueEntryTableName()
-                                                                + " (queue_id, message_id) values (?,?)"))
-            {
-                stmt.setString(1, queue.getId().toString());
-                stmt.setLong(2, messageId);
-                stmt.executeUpdate();
+                for(TransactionLogResource queue : queuesPerMessage.get(messageId))
+                {
+                    if (getLogger().isDebugEnabled())
+                    {
+                        getLogger().debug("Enqueuing message {} on queue {} with id {} [Connection {}]",
+                                messageId, queue.getName(), queue.getId(), conn);
+                    }
+                    stmt.setString(1, queue.getId().toString());
+                    stmt.setLong(2, messageId);
+                    stmt.addBatch();
+                }
             }
-
+            stmt.executeBatch();
         }
         catch (SQLException e)
         {
-            getLogger().error("Failed to enqueue message {}", messageId, e);
-            throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
-                                     + " to database", e);
+            getLogger().error("Failed to enqueue messages", e);
+            throw new StoreException("Error writing enqueued messages to database", e);
         }
-
     }
 
     private void dequeueMessage(ConnectionWrapper connWrapper, final UUID queueId,
@@ -1135,6 +1136,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         private int _storeSizeIncrease;
         private final List<Runnable> _preCommitActions = new ArrayList<>();
         private final List<Runnable> _postCommitActions = new ArrayList<>();
+        private final Map<Long, List<TransactionLogResource>> _messagesToEnqueue = new HashMap<>();
 
         protected JDBCTransaction()
         {
@@ -1156,25 +1158,27 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             final StoredMessage storedMessage = message.getStoredMessage();
             if(storedMessage instanceof StoredJDBCMessage)
             {
-                _preCommitActions.add(new Runnable()
-                {
-                    @Override
-                    public void run()
+                _preCommitActions.add(() -> {
+                    try
                     {
-                        try
-                        {
-                            ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
-                            _storeSizeIncrease += storedMessage.getContentSize();
-                        }
-                        catch (SQLException e)
-                        {
-                            throw new StoreException("Exception on enqueuing message into message store" + _messageId,
-                                                     e);
-                        }
+                        ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+                        _storeSizeIncrease += storedMessage.getContentSize();
                     }
+                    catch (SQLException e)
+                    {
+                        throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+                    }
+                });
+            }
+            if(_messagesToEnqueue.isEmpty())
+            {
+                _preCommitActions.add(() -> {
+                    AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue);
+                    _messagesToEnqueue.clear();
                 });
             }
-            AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
+            List<TransactionLogResource> queues = _messagesToEnqueue.computeIfAbsent(message.getMessageNumber(), messageId -> new ArrayList<>());
+            queues.add(queue);
             return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber());
         }
 
@@ -1236,6 +1240,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             checkMessageStoreOpen();
             _preCommitActions.clear();
+            _messagesToEnqueue.clear();
             AbstractJDBCMessageStore.this.abortTran(_connWrapper);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org