You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2008/06/24 10:19:38 UTC
svn commit: r671058 - in
/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2:
msgprocessors/MakeConnectionProcessor.java
storage/inmemory/InMemoryTransaction.java workers/WorkerLock.java
Author: gatfora
Date: Tue Jun 24 01:19:37 2008
New Revision: 671058
URL: http://svn.apache.org/viewvc?rev=671058&view=rev
Log:
Applying patches from SANDESHA2-161 and SANDESHA2-162, thanks David, Sara
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Tue Jun 24 01:19:37 2008
@@ -291,24 +291,19 @@
SandeshaThread sender = storageManager.getSender();
WorkerLock lock = sender.getWorkerLock();
-
+
String workId = matchingMessage.getMessageID();
- SenderWorker worker = null;
- synchronized(lock){
- while (lock.isWorkPresent(workId)) {
- try {
- //wait on the lock.
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ SenderWorker worker = new SenderWorker(pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+ while (!lock.addWork(workId, worker)) {
+ try {
+ // wait on the lock.
+ lock.awaitRemoval(workId);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
-
- worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
- worker.setLock(lock);
- worker.setWorkId(workId);
-
- lock.addWork(workId, worker);
+
}
setTransportProperties (returnMessage, pollMessage);
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Tue Jun 24 01:19:37 2008
@@ -20,13 +20,12 @@
package org.apache.sandesha2.storage.inmemory;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMBean;
@@ -44,7 +43,6 @@
private InMemoryStorageManager manager;
private String threadName;
private ArrayList enlistedBeans = new ArrayList();
- private InMemoryTransaction waitingForTran = null;
private boolean sentMessages = false;
private boolean active = true;
private Thread thread;
@@ -73,61 +71,54 @@
public boolean isActive () {
return active;
}
+
+ private class DummyTransaction extends ReentrantLock implements Transaction {
+
+ public void commit() throws SandeshaStorageException {
+ throw new SandeshaStorageException("Not supported");
+ }
+
+ public boolean isActive() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void rollback() throws SandeshaStorageException {
+ throw new SandeshaStorageException("Not supported");
+ }
+ }
+
public void enlist(RMBean bean) throws SandeshaStorageException {
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::enlist, " + bean);
- if(bean != null) {
+ if (bean != null) {
+ DummyTransaction tran = null;
synchronized (bean) {
- InMemoryTransaction other = (InMemoryTransaction) bean.getTransaction();
- while(other != null && other != this) {
- // Put ourselves into the list of waiters
- waitingForTran = other;
-
- // Look to see if there is a loop in the chain of waiters
- if(!enlistedBeans.isEmpty()) {
- HashSet set = new HashSet();
- set.add(this);
- while(other != null) {
- if(set.contains(other)) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
- SandeshaStorageException e = new SandeshaStorageException(message);
-
- // Do our best to get out of the way of the other work in the system
- waitingForTran = null;
- releaseLocks();
-
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(message, e);
- throw e;
- }
- set.add(other);
- other = other.waitingForTran;
- }
- }
-
- boolean warn = false;
+ tran = (DummyTransaction) bean.getTransaction();
+ if (tran == null) {
+ tran = new DummyTransaction();
+ bean.setTransaction(tran);
+ }
+ }
+
+ boolean locked = false;
+ while (!locked) {
+ locked = tran.tryLock();
+ if (!locked) {
+
try {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("This " + this + " waiting for " + waitingForTran);
- long pre = System.currentTimeMillis();
- bean.wait(5000);
- long post = System.currentTimeMillis();
- if ((post - pre) > 50000)
- warn = true;
- } catch(InterruptedException e) {
- // Do nothing
- }
- other = (InMemoryTransaction) bean.getTransaction();
- if (other != null && warn) {
- //we have been waiting for a long time - this might imply a three way deadlock so error condition
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("possible deadlock :" + this.toString() + " : " + bean.toString());
+ locked = tran.tryLock(5, TimeUnit.SECONDS);
+ if (!locked) {
+ if (log.isDebugEnabled())
+ log.debug("Waiting for bean lock 5 seconds");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
}
-
- waitingForTran = null;
- if(other == null) {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(this + " locking bean");
- bean.setTransaction(this);
- enlistedBeans.add(bean);
- }
+
+ enlistedBeans.add(bean);
+
}
}
@@ -141,10 +132,8 @@
Iterator beans = enlistedBeans.iterator();
while(beans.hasNext()) {
RMBean bean = (RMBean) beans.next();
- synchronized (bean) {
- bean.setTransaction(null);
- bean.notifyAll();
- }
+ DummyTransaction tran = (DummyTransaction) bean.getTransaction();
+ tran.unlock();
}
enlistedBeans.clear();
@@ -179,3 +168,4 @@
+
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=671058&r1=671057&r2=671058&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java Tue Jun 24 01:19:37 2008
@@ -19,7 +19,8 @@
package org.apache.sandesha2.workers;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,44 +29,81 @@
public class WorkerLock {
static final Log log = LogFactory.getLog(WorkerLock.class);
- private HashMap locks = new HashMap();
-
+ private ConcurrentHashMap locks = new ConcurrentHashMap();
+
public WorkerLock () {
}
-
- public synchronized boolean addWork (String work, Object owner) {
- if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::addWork " + work + ", " + owner);
- if(locks.containsKey(work)){
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::addWork " + false);
- return false;
- }
- locks.put(work, owner);
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::addWork " + true);
- return true;
- }
-
- public synchronized void removeWork (String work) {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::removeWork " + work);
- locks.remove(work);
-
- //wake up some thread that is waiting on this lock.
- this.notify();
-
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::removeWork");
- }
-
- public synchronized boolean isWorkPresent (String work) {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::isWorkPresent " + work);
- boolean value = locks.containsKey(work);
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::isWorkPresent " + value);
- return value;
- }
-
- public synchronized boolean ownsLock(String work, Object owner) {
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::ownsLock " + work + " ," + owner);
- Object realOwner = locks.get(work);
- if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: WorkerLock::ownsLock " + Boolean.valueOf(realOwner == owner));
- return realOwner == owner;
- }
-}
\ No newline at end of file
+
+ private static class Holder {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Object value;
+
+ public Holder(Object newValue) {
+ value = newValue;
+ }
+
+ public void awaitRelease() throws InterruptedException {
+ latch.await();
+ }
+
+ public void release() {
+ latch.countDown();
+ }
+
+ public Object getValue() {
+ return value;
+ }
+ }
+
+ public boolean addWork(String work, Object owner) {
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: WorkerLock::addWork " + work + ", " + owner);
+ Holder h = new Holder(owner);
+ Object prev = locks.putIfAbsent(work, h);
+ boolean result = (prev == null);
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: WorkerLock::addWork " + result);
+ return result;
+ }
+
+ public void awaitRemoval(String work) throws InterruptedException {
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Enter: WorkerLock::awaitRemoval " + work);
+ Holder h = (Holder) locks.get(work);
+ if (h != null) {
+ h.awaitRelease();
+ }
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: WorkerLock::awaitRemoval");
+ }
+
+ public void removeWork(String work) {
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Enter: WorkerLock::removeWork " + work);
+ Holder h = (Holder) locks.remove(work);
+ h.release();
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: WorkerLock::removeWork");
+ }
+
+ public boolean isWorkPresent(String work) {
+
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Enter: WorkerLock::isWorkPresent " + work);
+ boolean value = locks.containsKey(work);
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: WorkerLock::isWorkPresent " + value);
+ return value;
+ }
+
+ public boolean ownsLock(String work, Object owner) {
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Enter: WorkerLock::ownsLock " + work + " ," + owner);
+ Holder h = (Holder) locks.get(work);
+ Object realOwner = (h != null ? h.getValue() : null);
+ if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+ log.debug("Exit: WorkerLock::ownsLock " + Boolean.valueOf(realOwner == owner));
+ return realOwner == owner;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org