You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2018/10/29 20:59:44 UTC

qpid-broker-j git commit: QPID-8242 : JDBC store should remove message content/metadata asynchronously

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 7a99cdc98 -> bb57b5159


QPID-8242 : JDBC store should remove message content/metadata asynchronously


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bb57b515
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bb57b515
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bb57b515

Branch: refs/heads/master
Commit: bb57b51593128b31c796f80cba249f9a3e126177
Parents: 7a99cdc
Author: rgodfrey <rg...@apache.org>
Authored: Mon Oct 29 21:58:29 2018 +0100
Committer: rgodfrey <rg...@apache.org>
Committed: Mon Oct 29 21:58:29 2018 +0100

----------------------------------------------------------------------
 .../store/jdbc/AbstractJDBCMessageStore.java    | 124 +++++++++++++------
 1 file changed, 87 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bb57b515/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 4ad0fe6..e92aa73 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,8 +38,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -83,6 +86,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
     private final AtomicLong _messageId = new AtomicLong(0);
 
+    private static final List<Long> EMPTY_LIST = Collections.emptyList();
+    private final AtomicReference<List<Long>> _messagesToDelete = new AtomicReference<>(EMPTY_LIST);
+    private final AtomicBoolean _messageRemovalScheduled = new AtomicBoolean();
+
 
     protected final EventManager _eventManager = new EventManager();
     private ConfiguredObject<?> _parent;
@@ -409,57 +416,100 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         return _messageId.incrementAndGet();
     }
 
-    private void removeMessage(long messageId)
+    private void removeMessageAsync(long messageId)
+    {
+        List<Long> orig;
+        List<Long> updated;
+        do
+        {
+            orig = _messagesToDelete.get();
+            updated = new ArrayList<>(orig.size()+1);
+            updated.addAll(orig);
+            updated.add(messageId);
+            updated = Collections.unmodifiableList(updated);
+        } while (! _messagesToDelete.compareAndSet(orig, updated));
+        scheduleMessageRemoval();
+    }
+
+    private void scheduleMessageRemoval()
     {
-        try(Connection conn = newConnection())
+        if(_messageRemovalScheduled.compareAndSet(false, true))
         {
-            try
-            {
-                try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMetaDataTableName()
-                                                                   + " WHERE message_id = ?"))
+            _executor.submit(() -> {
+                List<Long> messageIds;
+                do
                 {
-                    stmt.setLong(1, messageId);
-                    int results = stmt.executeUpdate();
-                    stmt.close();
+                    messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+                    removeMessages(messageIds);
+                } while(!messageIds.isEmpty());
+
+                _messageRemovalScheduled.set(false);
+                if(!_messagesToDelete.get().isEmpty())
+                {
+                    scheduleMessageRemoval();
+                }
 
-                    if (results == 0)
+            });
+        }
+    }
+
+    boolean isMessageRemovalScheduled()
+    {
+        return _messageRemovalScheduled.get();
+    }
+
+    private void removeMessages(List<Long> messageIds)
+    {
+        if(messageIds != null && !messageIds.isEmpty())
+        {
+            String inpart = messageIds.stream().map(Object::toString).collect(Collectors.joining(", ", "(", ")"));
+            try(Connection conn = newConnection())
+            {
+                try
+                {
+                    try(Statement stmt = conn.createStatement())
                     {
-                        getLogger().debug(
-                                "Message id {} not found (attempt to remove failed - probably application initiated rollback)",
+                        int results = stmt.executeUpdate("DELETE FROM " + getMetaDataTableName()
+                                     + " WHERE message_id IN " + inpart);
+                        stmt.close();
+
+                        if (results != messageIds.size())
+                        {
+                            getLogger().debug(
+                                    "Some message ids in {} not found (attempt to remove failed - probably application initiated rollback)",
 
-                                messageId);
+                                    messageIds);
+                        }
+
+                        getLogger().debug("Deleted metadata for messages {}", messageIds);
                     }
 
-                    getLogger().debug("Deleted metadata for message {}", messageId);
+                    try(Statement stmt = conn.createStatement())
+                    {
+                        int results = stmt.executeUpdate("DELETE FROM " + getMessageContentTableName()
+                                                         + " WHERE message_id IN " + inpart);
+                    }
+                    conn.commit();
                 }
-
-                try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMessageContentTableName()
-                + " WHERE message_id = ?"))
+                catch(SQLException e)
                 {
+                    try
+                    {
+                        conn.rollback();
+                    }
+                    catch(SQLException t)
+                    {
+                        // ignore - we are re-throwing underlying exception
+                    }
 
-                    stmt.setLong(1, messageId);
-                    int results = stmt.executeUpdate();
+                    throw e;
                 }
-                conn.commit();
             }
-            catch(SQLException e)
+            catch (SQLException e)
             {
-                try
-                {
-                    conn.rollback();
-                }
-                catch(SQLException t)
-                {
-                    // ignore - we are re-throwing underlying exception
-                }
-
-                throw e;
+                throw new StoreException("Error removing messages with ids " + messageIds + " from database: " + e.getMessage(), e);
             }
         }
-        catch (SQLException e)
-        {
-            throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
-        }
 
     }
 
@@ -1510,7 +1560,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             _messages.remove(this);
             if(stored())
             {
-                AbstractJDBCMessageStore.this.removeMessage(_messageId);
+                AbstractJDBCMessageStore.this.removeMessageAsync(_messageId);
                 storedSizeChange(-getContentSize());
             }
 
@@ -1652,7 +1702,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         public void visitMessages(MessageHandler handler) throws StoreException
         {
             checkMessageStoreOpen();
-
+            while(isMessageRemovalScheduled());
             try(Connection conn = newAutoCommitConnection())
             {
                 try (Statement stmt = conn.createStatement())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org