You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2018/10/29 20:59:44 UTC
qpid-broker-j git commit: QPID-8242 : JDBC store should remove
message content/metadata asynchronously
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 7a99cdc98 -> bb57b5159
QPID-8242 : JDBC store should remove message content/metadata asynchronously
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bb57b515
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bb57b515
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bb57b515
Branch: refs/heads/master
Commit: bb57b51593128b31c796f80cba249f9a3e126177
Parents: 7a99cdc
Author: rgodfrey <rg...@apache.org>
Authored: Mon Oct 29 21:58:29 2018 +0100
Committer: rgodfrey <rg...@apache.org>
Committed: Mon Oct 29 21:58:29 2018 +0100
----------------------------------------------------------------------
.../store/jdbc/AbstractJDBCMessageStore.java | 124 +++++++++++++------
1 file changed, 87 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bb57b515/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 4ad0fe6..e92aa73 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,8 +38,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -83,6 +86,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private final AtomicLong _messageId = new AtomicLong(0);
+ private static final List<Long> EMPTY_LIST = Collections.emptyList();
+ private final AtomicReference<List<Long>> _messagesToDelete = new AtomicReference<>(EMPTY_LIST);
+ private final AtomicBoolean _messageRemovalScheduled = new AtomicBoolean();
+
protected final EventManager _eventManager = new EventManager();
private ConfiguredObject<?> _parent;
@@ -409,57 +416,100 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
return _messageId.incrementAndGet();
}
- private void removeMessage(long messageId)
+ private void removeMessageAsync(long messageId)
+ {
+ List<Long> orig;
+ List<Long> updated;
+ do
+ {
+ orig = _messagesToDelete.get();
+ updated = new ArrayList<>(orig.size()+1);
+ updated.addAll(orig);
+ updated.add(messageId);
+ updated = Collections.unmodifiableList(updated);
+ } while (! _messagesToDelete.compareAndSet(orig, updated));
+ scheduleMessageRemoval();
+ }
+
+ private void scheduleMessageRemoval()
{
- try(Connection conn = newConnection())
+ if(_messageRemovalScheduled.compareAndSet(false, true))
{
- try
- {
- try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMetaDataTableName()
- + " WHERE message_id = ?"))
+ _executor.submit(() -> {
+ List<Long> messageIds;
+ do
{
- stmt.setLong(1, messageId);
- int results = stmt.executeUpdate();
- stmt.close();
+ messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+ removeMessages(messageIds);
+ } while(!messageIds.isEmpty());
+
+ _messageRemovalScheduled.set(false);
+ if(!_messagesToDelete.get().isEmpty())
+ {
+ scheduleMessageRemoval();
+ }
- if (results == 0)
+ });
+ }
+ }
+
+ boolean isMessageRemovalScheduled()
+ {
+ return _messageRemovalScheduled.get();
+ }
+
+ private void removeMessages(List<Long> messageIds)
+ {
+ if(messageIds != null && !messageIds.isEmpty())
+ {
+ String inpart = messageIds.stream().map(Object::toString).collect(Collectors.joining(", ", "(", ")"));
+ try(Connection conn = newConnection())
+ {
+ try
+ {
+ try(Statement stmt = conn.createStatement())
{
- getLogger().debug(
- "Message id {} not found (attempt to remove failed - probably application initiated rollback)",
+ int results = stmt.executeUpdate("DELETE FROM " + getMetaDataTableName()
+ + " WHERE message_id IN " + inpart);
+ stmt.close();
+
+ if (results != messageIds.size())
+ {
+ getLogger().debug(
+ "Some message ids in {} not found (attempt to remove failed - probably application initiated rollback)",
- messageId);
+ messageIds);
+ }
+
+ getLogger().debug("Deleted metadata for messages {}", messageIds);
}
- getLogger().debug("Deleted metadata for message {}", messageId);
+ try(Statement stmt = conn.createStatement())
+ {
+ int results = stmt.executeUpdate("DELETE FROM " + getMessageContentTableName()
+ + " WHERE message_id IN " + inpart);
+ }
+ conn.commit();
}
-
- try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMessageContentTableName()
- + " WHERE message_id = ?"))
+ catch(SQLException e)
{
+ try
+ {
+ conn.rollback();
+ }
+ catch(SQLException t)
+ {
+ // ignore - we are re-throwing underlying exception
+ }
- stmt.setLong(1, messageId);
- int results = stmt.executeUpdate();
+ throw e;
}
- conn.commit();
}
- catch(SQLException e)
+ catch (SQLException e)
{
- try
- {
- conn.rollback();
- }
- catch(SQLException t)
- {
- // ignore - we are re-throwing underlying exception
- }
-
- throw e;
+ throw new StoreException("Error removing messages with ids " + messageIds + " from database: " + e.getMessage(), e);
}
}
- catch (SQLException e)
- {
- throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
- }
}
@@ -1510,7 +1560,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
_messages.remove(this);
if(stored())
{
- AbstractJDBCMessageStore.this.removeMessage(_messageId);
+ AbstractJDBCMessageStore.this.removeMessageAsync(_messageId);
storedSizeChange(-getContentSize());
}
@@ -1652,7 +1702,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
-
+ while(isMessageRemovalScheduled());
try(Connection conn = newAutoCommitConnection())
{
try (Statement stmt = conn.createStatement())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org