You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2008/06/24 10:19:38 UTC

svn commit: r671058 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/MakeConnectionProcessor.java storage/inmemory/InMemoryTransaction.java workers/WorkerLock.java

Author: gatfora
Date: Tue Jun 24 01:19:37 2008
New Revision: 671058

URL: http://svn.apache.org/viewvc?rev=671058&view=rev
Log:
Applying patches from SANDESHA2-161 and SANDESHA2-162, thanks David, Sara

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Tue Jun 24 01:19:37 2008
@@ -291,24 +291,19 @@
 
 			SandeshaThread sender = storageManager.getSender();
 			WorkerLock lock = sender.getWorkerLock();
-			
+
 			String workId = matchingMessage.getMessageID();
-			SenderWorker worker = null;
-			synchronized(lock){
-				while (lock.isWorkPresent(workId)) {
-					try {
-						//wait on the lock.
-						lock.wait();
-					} catch (InterruptedException e) {
-							e.printStackTrace();
-					}
+			SenderWorker worker = new SenderWorker(pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
+			worker.setLock(lock);
+			worker.setWorkId(workId);
+			while (!lock.addWork(workId, worker)) {
+				try {
+					// wait on the lock.
+					lock.awaitRemoval(workId);
+				} catch (InterruptedException e) {
+					e.printStackTrace();
 				}
-				
-				worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
-				worker.setLock(lock);
-				worker.setWorkId(workId);
-				
-				lock.addWork(workId, worker);
+
 			}
 			
 			setTransportProperties (returnMessage, pollMessage);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Tue Jun 24 01:19:37 2008
@@ -20,13 +20,12 @@
 package org.apache.sandesha2.storage.inmemory;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beans.RMBean;
@@ -44,7 +43,6 @@
 	private InMemoryStorageManager manager;
 	private String threadName;
 	private ArrayList enlistedBeans = new ArrayList();
-	private InMemoryTransaction waitingForTran = null;
 	private boolean sentMessages = false;
 	private boolean active = true;
 	private Thread thread;
@@ -73,61 +71,54 @@
 	public boolean isActive () {
 		return active;
 	}
+	
+	private class DummyTransaction extends ReentrantLock implements Transaction {
+
+		public void commit() throws SandeshaStorageException {
+			throw new SandeshaStorageException("Not supported");
+		}
+
+		public boolean isActive() {
+			// TODO Auto-generated method stub
+			return false;
+		}
+
+		public void rollback() throws SandeshaStorageException {
+			throw new SandeshaStorageException("Not supported");
+		}
 
+	}
+	
 	public void enlist(RMBean bean) throws SandeshaStorageException {
 		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::enlist, " + bean);
-		if(bean != null) {
+		if (bean != null) {
+			DummyTransaction tran = null;
 			synchronized (bean) {
-				InMemoryTransaction other = (InMemoryTransaction) bean.getTransaction();
-				while(other != null && other != this) {
-					// Put ourselves into the list of waiters
-					waitingForTran = other;
-
-					// Look to see if there is a loop in the chain of waiters
-					if(!enlistedBeans.isEmpty()) {
-						HashSet set = new HashSet();
-						set.add(this);
-						while(other != null) {
-							if(set.contains(other)) {
-								String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
-								SandeshaStorageException e = new SandeshaStorageException(message);
-								
-								// Do our best to get out of the way of the other work in the system
-								waitingForTran = null;
-								releaseLocks();
-								
-								if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(message, e);
-								throw e;
-							}
-							set.add(other);
-							other = other.waitingForTran;
-						}
-					}
-					
-					boolean warn = false;
+				tran = (DummyTransaction) bean.getTransaction();
+				if (tran == null) {
+					tran = new DummyTransaction();
+					bean.setTransaction(tran);
+				}
+			}
+
+			boolean locked = false;
+			while (!locked) {
+				locked = tran.tryLock();
+				if (!locked) {
+
 					try {
-						if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("This " + this + " waiting for " + waitingForTran);
-						long pre = System.currentTimeMillis();
-						bean.wait(5000); 
-						long post = System.currentTimeMillis();
-						if ((post - pre) > 50000)
-							warn = true;
-					} catch(InterruptedException e) {
-						// Do nothing
-					}
-					other = (InMemoryTransaction) bean.getTransaction();
-					if (other != null && warn) {
-						//we have been waiting for a long time - this might imply a three way deadlock so error condition
-						if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("possible deadlock :" + this.toString() + " : " + bean.toString());
+						locked = tran.tryLock(5, TimeUnit.SECONDS);
+						if (!locked) {
+							if (log.isDebugEnabled())
+								log.debug("Waiting for bean lock 5 seconds");
+						}
+					} catch (InterruptedException e) {
+						e.printStackTrace();
 					}
 				}
-				
-				waitingForTran = null;
-				if(other == null) {
-					if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(this + " locking bean");
-					bean.setTransaction(this);
-					enlistedBeans.add(bean);
-				}
+
+				enlistedBeans.add(bean);
+
 			}
 		}
 		
@@ -141,10 +132,8 @@
 		Iterator beans = enlistedBeans.iterator();
 		while(beans.hasNext()) {
 			RMBean bean = (RMBean) beans.next();
-			synchronized (bean) {
-				bean.setTransaction(null);
-				bean.notifyAll();
-			}
+			DummyTransaction tran = (DummyTransaction) bean.getTransaction();
+			tran.unlock();
 		}
 		enlistedBeans.clear();
 		
@@ -179,3 +168,4 @@
 
 
 
+

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java Tue Jun 24 01:19:37 2008
@@ -19,7 +19,8 @@
 
 package org.apache.sandesha2.workers;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,44 +29,81 @@
 public class WorkerLock {
 
   static final Log log = LogFactory.getLog(WorkerLock.class);
-  private HashMap locks = new HashMap();
-	
+  private ConcurrentHashMap locks = new ConcurrentHashMap();
+  
   public WorkerLock () {
   }
-	
-  public synchronized boolean addWork (String work, Object owner) {
-	if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::addWork " + work + ", " + owner);
-    if(locks.containsKey(work)){
-    	if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::addWork " + false);
-    	return false;
-    }
-    locks.put(work, owner);
-    if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::addWork " + true);
-	return true;
-  }
-	
-	public synchronized void removeWork (String work) {
-		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::removeWork " + work);
-		locks.remove(work);
-		
-		//wake up some thread that is waiting on this lock.
-		this.notify();
-		
-		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::removeWork");
-	}
-	
-	public synchronized boolean isWorkPresent (String work) {
-	  if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::isWorkPresent " + work);
-	  boolean value = locks.containsKey(work);
-	  if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::isWorkPresent " + value);
-	  return value;
-	}
-	
-	 public synchronized boolean ownsLock(String work, Object owner) {
-		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::ownsLock " + work + " ," + owner);
-	    Object realOwner = locks.get(work);
-	    if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::ownsLock " + Boolean.valueOf(realOwner == owner));
-	    return realOwner == owner;
-	  }
 
-}
\ No newline at end of file
+  
+  	private static class Holder {
+		CountDownLatch latch = new CountDownLatch(1);
+
+		Object value;
+
+		public Holder(Object newValue) {
+			value = newValue;
+		}
+
+		public void awaitRelease() throws InterruptedException {
+			latch.await();
+		}
+
+		public void release() {
+			latch.countDown();
+		}
+
+		public Object getValue() {
+			return value;
+		}
+	}
+
+	public boolean addWork(String work, Object owner) {
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::addWork " + work + ", " + owner);
+    	Holder h = new Holder(owner);
+		Object prev = locks.putIfAbsent(work, h);
+		boolean result = (prev == null);
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Exit: WorkerLock::addWork " + result);
+		return result;
+	}
+
+	public void awaitRemoval(String work) throws InterruptedException {
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Enter: WorkerLock::awaitRemoval " + work);
+		Holder h = (Holder) locks.get(work);
+		if (h != null) {
+			h.awaitRelease();
+		}
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Exit: WorkerLock::awaitRemoval");
+	}
+
+	public void removeWork(String work) {
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Enter: WorkerLock::removeWork " + work);
+		Holder h = (Holder) locks.remove(work);
+		h.release();
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Exit: WorkerLock::removeWork");
+	}
+
+	public boolean isWorkPresent(String work) {
+
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Enter: WorkerLock::isWorkPresent " + work);
+		boolean value = locks.containsKey(work);
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Exit: WorkerLock::isWorkPresent " + value);
+		return value;
+	}
+
+	public boolean ownsLock(String work, Object owner) {
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Enter: WorkerLock::ownsLock " + work + " ," + owner);
+		Holder h = (Holder) locks.get(work);
+		Object realOwner = (h != null ? h.getValue() : null);
+		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+			log.debug("Exit: WorkerLock::ownsLock " + Boolean.valueOf(realOwner == owner));
+		return realOwner == owner;
+	}
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org