You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/03/08 19:38:53 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5785
Repository: activemq
Updated Branches:
refs/heads/master 078f39f58 -> 8b23e072e
https://issues.apache.org/jira/browse/AMQ-5785
Avoid holding the intrinsic lock on the cursor when expiring the
messages.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b23e072
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b23e072
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b23e072
Branch: refs/heads/master
Commit: 8b23e072eeab2beebf62fd267bf8d9f88d05b5c2
Parents: 078f39f
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 8 13:37:58 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 8 13:37:58 2016 -0500
----------------------------------------------------------------------
.../cursors/FilePendingMessageCursor.java | 40 +++++++++++++-------
1 file changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8b23e072/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 9c9a8e7..3c2bd5f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,10 +17,13 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@@ -31,25 +34,26 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.PList;
-import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PListEntry;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.activemq.util.ByteSequence;
/**
* persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
- *
- *
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
+
static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
+
private static final AtomicLong NAME_COUNT = new AtomicLong();
+
protected Broker broker;
private final PListStore store;
private final String name;
@@ -61,6 +65,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private boolean flushRequired;
private final AtomicBoolean started = new AtomicBoolean();
private final WireFormat wireFormat = new OpenWireFormat();
+
/**
* @param broker
* @param name
@@ -374,9 +379,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
@Override
public synchronized boolean isFull() {
-
return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
-
}
@Override
@@ -392,11 +395,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
@Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
+ List<MessageReference> expiredMessages = null;
synchronized (this) {
if (!flushRequired && size() != 0) {
flushRequired =true;
if (!iterating) {
- expireOldMessages();
+ expiredMessages = expireOldMessages();
if (!hasSpace()) {
flushToDisk();
flushRequired = false;
@@ -404,6 +408,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
}
}
}
+
+ if (expiredMessages != null) {
+ for (MessageReference node : expiredMessages) {
+ discardExpiredMessage(node);
+ }
+ }
}
}
@@ -412,26 +422,30 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return true;
}
- protected synchronized void expireOldMessages() {
+ private synchronized List<MessageReference> expireOldMessages() {
+ List<MessageReference> expired = new ArrayList<MessageReference>();
if (!memoryList.isEmpty()) {
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
if (node.isExpired()) {
node.decrementReferenceCount();
- discardExpiredMessage(node);
+ expired.add(node);
iterator.remove();
}
}
}
+
+ return expired;
}
protected synchronized void flushToDisk() {
if (!memoryList.isEmpty() && store != null) {
long start = 0;
- if (LOG.isTraceEnabled()) {
+ if (LOG.isTraceEnabled()) {
start = System.currentTimeMillis();
- LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
- }
+ LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(),
+ (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
+ }
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
node.decrementReferenceCount();