You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/03/08 19:38:53 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5785

Repository: activemq
Updated Branches:
  refs/heads/master 078f39f58 -> 8b23e072e


https://issues.apache.org/jira/browse/AMQ-5785

Avoid holding the intrinsic lock on the cursor when expiring the
messages.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b23e072
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b23e072
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b23e072

Branch: refs/heads/master
Commit: 8b23e072eeab2beebf62fd267bf8d9f88d05b5c2
Parents: 078f39f
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 8 13:37:58 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 8 13:37:58 2016 -0500

----------------------------------------------------------------------
 .../cursors/FilePendingMessageCursor.java       | 40 +++++++++++++-------
 1 file changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8b23e072/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 9c9a8e7..3c2bd5f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,10 +17,13 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
@@ -31,25 +34,26 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.PList;
-import org.apache.activemq.store.PListStore;
 import org.apache.activemq.store.PListEntry;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.activemq.util.ByteSequence;
 
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- *
- *
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
+
     static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
+
     private static final AtomicLong NAME_COUNT = new AtomicLong();
+
     protected Broker broker;
     private final PListStore store;
     private final String name;
@@ -61,6 +65,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
     private boolean flushRequired;
     private final AtomicBoolean started = new AtomicBoolean();
     private final WireFormat wireFormat = new OpenWireFormat();
+
     /**
      * @param broker
      * @param name
@@ -374,9 +379,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
 
     @Override
     public synchronized boolean isFull() {
-
         return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull());
-
     }
 
     @Override
@@ -392,11 +395,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
     @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
+            List<MessageReference> expiredMessages = null;
             synchronized (this) {
                 if (!flushRequired && size() != 0) {
                     flushRequired =true;
                     if (!iterating) {
-                        expireOldMessages();
+                        expiredMessages = expireOldMessages();
                         if (!hasSpace()) {
                             flushToDisk();
                             flushRequired = false;
@@ -404,6 +408,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
                     }
                 }
             }
+
+            if (expiredMessages != null) {
+                for (MessageReference node : expiredMessages) {
+                    discardExpiredMessage(node);
+                }
+            }
         }
     }
 
@@ -412,26 +422,30 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
         return true;
     }
 
-    protected synchronized void expireOldMessages() {
+    private synchronized List<MessageReference> expireOldMessages() {
+        List<MessageReference> expired = new ArrayList<MessageReference>();
         if (!memoryList.isEmpty()) {
             for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
                 MessageReference node = iterator.next();
                 if (node.isExpired()) {
                     node.decrementReferenceCount();
-                    discardExpiredMessage(node);
+                    expired.add(node);
                     iterator.remove();
                 }
             }
         }
+
+        return expired;
     }
 
     protected synchronized void flushToDisk() {
         if (!memoryList.isEmpty() && store != null) {
             long start = 0;
-             if (LOG.isTraceEnabled()) {
+            if (LOG.isTraceEnabled()) {
                 start = System.currentTimeMillis();
-                LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
-             }
+                LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(),
+                    (systemUsage != null ? systemUsage.getMemoryUsage() : "") });
+            }
             for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
                 MessageReference node = iterator.next();
                 node.decrementReferenceCount();