You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/25 18:58:42 UTC
svn commit: r479156 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
./ cursors/
Author: rajdavies
Date: Sat Nov 25 09:58:41 2006
New Revision: 479156
URL: http://svn.apache.org/viewvc?view=rev&rev=479156
Log:
Tidied up locking around cursor iterators
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Sat Nov 25 09:58:41 2006
@@ -118,15 +118,19 @@
}
}
- if( !keepDurableSubsActive ) {
- synchronized(pending) {
- pending.reset();
- while(pending.hasNext()) {
- MessageReference node = pending.next();
- node.decrementReferenceCount();
- pending.remove();
- }
- }
+ if(!keepDurableSubsActive){
+ synchronized(pending){
+ try{
+ pending.reset();
+ while(pending.hasNext()){
+ MessageReference node=pending.next();
+ node.decrementReferenceCount();
+ pending.remove();
+ }
+ }finally{
+ pending.release();
+ }
+ }
}
prefetchExtension=0;
}
@@ -195,22 +199,24 @@
/**
* Release any references that we are holding.
*/
- public void destroy() {
- synchronized(pending) {
- pending.reset();
- while(pending.hasNext()) {
- MessageReference node = pending.next();
- node.decrementReferenceCount();
- }
- pending.clear();
- }
-
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
+ public void destroy(){
+ try{
+ synchronized(pending){
+ pending.reset();
+ while(pending.hasNext()){
+ MessageReference node=pending.next();
+ node.decrementReferenceCount();
+ }
+ }
+ }finally{
+ pending.release();
+ pending.clear();
+ }
+ for(Iterator iter=dispatched.iterator();iter.hasNext();){
+ MessageReference node=(MessageReference)iter.next();
node.decrementReferenceCount();
}
dispatched.clear();
-
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sat Nov 25 09:58:41 2006
@@ -123,6 +123,7 @@
}
public void add(MessageReference node) throws Exception{
+ try {
boolean pendingEmpty = false;
synchronized(pending){
pendingEmpty=pending.isEmpty();
@@ -139,21 +140,30 @@
pending.addMessageLast(node);
}
}
+ }catch(Throwable e) {
+ e.printStackTrace();
+
+ }
}
- public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
+ public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{
synchronized(pending){
- pending.reset();
- while(pending.hasNext()){
- MessageReference node=pending.next();
- if(node.getMessageId().equals(mdn.getMessageId())){
- pending.remove();
- createMessageDispatch(node,node.getMessage());
- dispatched.addLast(node);
- return;
+ try{
+ pending.reset();
+ while(pending.hasNext()){
+ MessageReference node=pending.next();
+ if(node.getMessageId().equals(mdn.getMessageId())){
+ pending.remove();
+ createMessageDispatch(node,node.getMessage());
+ dispatched.addLast(node);
+ return;
+ }
}
+ }finally{
+ pending.release();
}
- throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()+") was not in the pending list: "+pending);
+ throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()
+ +") was not in the pending list: "+pending);
}
}
@@ -387,6 +397,7 @@
dispatch(node);
}
}finally{
+ pending.release();
dispatching=false;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sat Nov 25 09:58:41 2006
@@ -545,54 +545,58 @@
}
}
}
- synchronized (messages) {
- messages.reset();
- while(messages.hasNext()) {
- try {
- MessageReference r = messages.next();
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- if (m != null) {
- l.add(m);
+ synchronized(messages){
+ try{
+ messages.reset();
+ while(messages.hasNext()){
+ try{
+ MessageReference r=messages.next();
+ r.incrementReferenceCount();
+ try{
+ Message m=r.getMessage();
+ if(m!=null){
+ l.add(m);
+ }
+ }finally{
+ r.decrementReferenceCount();
}
+ }catch(IOException e){
+ log.error("caught an exception brwsing "+this,e);
}
- finally {
- r.decrementReferenceCount();
- }
- }
- catch (IOException e) {
- log.error("caught an exception brwsing " + this,e);
}
+ }finally{
+ messages.release();
}
}
return (Message[]) l.toArray(new Message[l.size()]);
}
- public Message getMessage(String messageId) {
- synchronized (messages) {
- messages.reset();
- while(messages.hasNext()) {
- try {
- MessageReference r = messages.next();
- if (messageId.equals(r.getMessageId().toString())) {
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- if (m != null) {
- return m;
+ public Message getMessage(String messageId){
+ synchronized(messages){
+ try{
+ messages.reset();
+ while(messages.hasNext()){
+ try{
+ MessageReference r=messages.next();
+ if(messageId.equals(r.getMessageId().toString())){
+ r.incrementReferenceCount();
+ try{
+ Message m=r.getMessage();
+ if(m!=null){
+ return m;
+ }
+ }finally{
+ r.decrementReferenceCount();
}
+ break;
}
- finally {
- r.decrementReferenceCount();
- }
- break;
+ }catch(IOException e){
+ log.error("got an exception retrieving message "+messageId);
}
}
- catch (IOException e) {
- log.error("got an exception retrieving message " + messageId);
- }
+ }finally{
+ messages.release();
}
}
return null;
@@ -868,13 +872,17 @@
int count=0;
result=new ArrayList(toPageIn);
synchronized(messages){
- messages.reset();
- while(messages.hasNext()&&count<toPageIn){
- MessageReference node=messages.next();
- messages.remove();
- node=createMessageReference(node.getMessage());
- result.add(node);
- count++;
+ try{
+ messages.reset();
+ while(messages.hasNext()&&count<toPageIn){
+ MessageReference node=messages.next();
+ messages.remove();
+ node=createMessageReference(node.getMessage());
+ result.add(node);
+ count++;
+ }
+ }finally{
+ messages.release();
}
}
synchronized(pagedInMessages){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Sat Nov 25 09:58:41 2006
@@ -135,36 +135,42 @@
* Discard any expired messages from the matched list. Called from a synchronized block.
* @throws IOException
*/
- protected void removeExpiredMessages() throws IOException {
- matched.reset();
- while(matched.hasNext()) {
- MessageReference node=matched.next();
- if (node.isExpired()) {
- matched.remove();
- dispatched.incrementAndGet();
- node.decrementReferenceCount();
- break;
- }
- }
- matched.release();
- }
-
- public void processMessageDispatchNotification(MessageDispatchNotification mdn){
- synchronized(matchedListMutex){
+ protected void removeExpiredMessages() throws IOException{
+ try{
matched.reset();
- while(matched.hasNext()) {
+ while(matched.hasNext()){
MessageReference node=matched.next();
- if(node.getMessageId().equals(mdn.getMessageId())){
+ if(node.isExpired()){
matched.remove();
dispatched.incrementAndGet();
node.decrementReferenceCount();
break;
}
}
+ }finally{
matched.release();
}
}
+ public void processMessageDispatchNotification(MessageDispatchNotification mdn){
+ synchronized(matchedListMutex){
+ try{
+ matched.reset();
+ while(matched.hasNext()){
+ MessageReference node=matched.next();
+ if(node.getMessageId().equals(mdn.getMessageId())){
+ matched.remove();
+ dispatched.incrementAndGet();
+ node.decrementReferenceCount();
+ break;
+ }
+ }
+ }finally{
+ matched.release();
+ }
+ }
+ }
+
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
// Handle the standard acknowledgment case.
@@ -335,21 +341,22 @@
private void dispatchMatched() throws IOException{
synchronized(matchedListMutex){
- matched.reset();
- while(matched.hasNext()) {
- MessageReference message=(MessageReference) matched.next();
- matched.remove();
-
- // Message may have been sitting in the matched list a while
- // waiting for the consumer to ak the message.
- if( message.isExpired() ) {
- message.decrementReferenceCount();
- continue; // just drop it.
- }
-
- dispatch(message);
+ try{
+ matched.reset();
+ while(matched.hasNext()){
+ MessageReference message=(MessageReference)matched.next();
+ matched.remove();
+ // Message may have been sitting in the matched list a while
+ // waiting for the consumer to ak the message.
+ if(message.isExpired()){
+ message.decrementReferenceCount();
+ continue; // just drop it.
+ }
+ dispatch(message);
+ }
+ }finally{
+ matched.release();
}
- matched.release();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Sat Nov 25 09:58:41 2006
@@ -110,4 +110,8 @@
public boolean isFull() {
return usageManager != null ? usageManager.isFull() : false;
}
+
+
+ public void release(){
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Sat Nov 25 09:58:41 2006
@@ -18,7 +18,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@@ -38,6 +37,7 @@
* @version $Revision$
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
+
static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
private Store store;
private String name;
@@ -45,8 +45,7 @@
private ListContainer diskList;
private Iterator iter=null;
private Destination regionDestination;
- private Lock iterLock=new ReentrantLock();
- private Object mutex=new Object();
+ private ReentrantLock iterLock=new ReentrantLock();
/**
* @param name
@@ -60,10 +59,8 @@
/**
* @return true if there are no pending messages
*/
- public boolean isEmpty(){
- synchronized(mutex){
- return memoryList.isEmpty()&&isDiskListEmpty();
- }
+ public synchronized boolean isEmpty(){
+ return memoryList.isEmpty()&&isDiskListEmpty();
}
/**
@@ -71,9 +68,11 @@
*
*/
public void reset(){
- iterLock.lock();
- synchronized(mutex){
- iter=isSpaceInMemoryList()?memoryList.iterator():diskList.listIterator();
+ try{
+ iterLock.lockInterruptibly();
+ iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator();
+ }catch(InterruptedException e){
+ log.warn("Failed to get lock ",e);
}
}
@@ -81,7 +80,7 @@
iterLock.unlock();
}
- public void destroy(){
+ public synchronized void destroy(){
for(Iterator i=memoryList.iterator();i.hasNext();){
Message node=(Message)i.next();
node.decrementReferenceCount();
@@ -92,23 +91,21 @@
}
}
- public LinkedList pageInList(int maxItems){
+ public synchronized LinkedList pageInList(int maxItems){
LinkedList result=new LinkedList();
- synchronized(mutex){
- int count=0;
- for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){
- result.add(i.next());
+ int count=0;
+ for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){
+ result.add(i.next());
+ count++;
+ }
+ if(count<maxItems&&!isDiskListEmpty()){
+ for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
+ Message message=(Message)i.next();
+ message.setRegionDestination(regionDestination);
+ message.incrementReferenceCount();
+ result.add(message);
count++;
}
- if(count<maxItems&&!isDiskListEmpty()){
- for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){
- Message message=(Message)i.next();
- message.setRegionDestination(regionDestination);
- message.incrementReferenceCount();
- result.add(message);
- count++;
- }
- }
}
return result;
}
@@ -118,20 +115,18 @@
*
* @param node
*/
- public void addMessageLast(MessageReference node){
- synchronized(mutex){
- try{
- regionDestination=node.getMessage().getRegionDestination();
- if(isSpaceInMemoryList()){
- memoryList.add(node);
- }else{
- flushToDisk();
- node.decrementReferenceCount();
- getDiskList().addLast(node);
- }
- }catch(IOException e){
- throw new RuntimeException(e);
+ public synchronized void addMessageLast(MessageReference node){
+ try{
+ regionDestination=node.getMessage().getRegionDestination();
+ if(isSpaceInMemoryList()){
+ memoryList.add(node);
+ }else{
+ flushToDisk();
+ node.decrementReferenceCount();
+ getDiskList().addLast(node);
}
+ }catch(IOException e){
+ throw new RuntimeException(e);
}
}
@@ -140,93 +135,79 @@
*
* @param node
*/
- public void addMessageFirst(MessageReference node){
- synchronized(mutex){
- try{
- regionDestination=node.getMessage().getRegionDestination();
- if(isSpaceInMemoryList()){
- memoryList.addFirst(node);
- }else{
- flushToDisk();
- node.decrementReferenceCount();
- getDiskList().addFirst(node);
- }
- }catch(IOException e){
- throw new RuntimeException(e);
+ public synchronized void addMessageFirst(MessageReference node){
+ try{
+ regionDestination=node.getMessage().getRegionDestination();
+ if(isSpaceInMemoryList()){
+ memoryList.addFirst(node);
+ }else{
+ flushToDisk();
+ node.decrementReferenceCount();
+ getDiskList().addFirst(node);
}
+ }catch(IOException e){
+ throw new RuntimeException(e);
}
}
/**
* @return true if there pending messages to dispatch
*/
- public boolean hasNext(){
- synchronized(mutex){
- return iter.hasNext();
- }
+ public synchronized boolean hasNext(){
+ return iter.hasNext();
}
/**
* @return the next pending message
*/
- public MessageReference next(){
- synchronized(mutex){
- Message message=(Message)iter.next();
- if(!isDiskListEmpty()){
- // got from disk
- message.setRegionDestination(regionDestination);
- message.incrementReferenceCount();
- }
- return message;
+ public synchronized MessageReference next(){
+ Message message=(Message)iter.next();
+ if(!isDiskListEmpty()){
+ // got from disk
+ message.setRegionDestination(regionDestination);
+ message.incrementReferenceCount();
}
+ return message;
}
/**
* remove the message at the cursor position
*
*/
- public void remove(){
- synchronized(mutex){
- iter.remove();
- }
+ public synchronized void remove(){
+ iter.remove();
}
/**
* @param node
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/
- public void remove(MessageReference node){
- synchronized(mutex){
- memoryList.remove(node);
- if(!isDiskListEmpty()){
- getDiskList().remove(node);
- }
+ public synchronized void remove(MessageReference node){
+ memoryList.remove(node);
+ if(!isDiskListEmpty()){
+ getDiskList().remove(node);
}
}
/**
* @return the number of pending messages
*/
- public int size(){
- synchronized(mutex){
- return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
- }
+ public synchronized int size(){
+ return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size());
}
/**
* clear all pending messages
*
*/
- public void clear(){
- synchronized(mutex){
- memoryList.clear();
- if(!isDiskListEmpty()){
- getDiskList().clear();
- }
+ public synchronized void clear(){
+ memoryList.clear();
+ if(!isDiskListEmpty()){
+ getDiskList().clear();
}
}
- public boolean isFull(){
+ public synchronized boolean isFull(){
// we always have space - as we can persist to disk
return false;
}
@@ -253,15 +234,13 @@
return hasSpace()&&isDiskListEmpty();
}
- protected void flushToDisk(){
- synchronized(mutex){
- for(Iterator i=memoryList.iterator();i.hasNext();){
- MessageReference node=(MessageReference)i.next();
- node.decrementReferenceCount();
- getDiskList().addLast(node);
- }
- memoryList.clear();
+ protected synchronized void flushToDisk(){
+ for(Iterator i=memoryList.iterator();i.hasNext();){
+ MessageReference node=(MessageReference)i.next();
+ node.decrementReferenceCount();
+ getDiskList().addLast(node);
}
+ memoryList.clear();
}
protected boolean isDiskListEmpty(){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Sat Nov 25 09:58:41 2006
@@ -54,6 +54,13 @@
*
*/
public void reset();
+
+ /**
+ * hint to the cursor to release any locks it might have
+ * grabbed after a reset
+ *
+ */
+ public void release();
/**
* add message to await dispatch
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=479156&r1=479155&r2=479156
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Sat Nov 25 09:58:41 2006
@@ -185,10 +185,16 @@
}
public synchronized void reset(){
- nonPersistent.reset();
for(Iterator i=storePrefetches.iterator();i.hasNext();){
AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
tsp.reset();
+ }
+ }
+
+ public synchronized void release(){
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ AbstractPendingMessageCursor tsp=(AbstractPendingMessageCursor)i.next();
+ tsp.release();
}
}