You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2006/11/23 11:16:57 UTC

svn commit: r478523 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage: beans/ inmemory/

Author: mlovett
Date: Thu Nov 23 02:16:56 2006
New Revision: 478523

URL: http://svn.apache.org/viewvc?view=rev&rev=478523
Log:
Add locking to the in-memory storage manager, see SANDESHA2-49

Added:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java   (with props)
Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMBean.java Thu Nov 23 02:16:56 2006
@@ -19,6 +19,8 @@
 
 import java.io.Serializable;
 
+import org.apache.sandesha2.storage.Transaction;
+
 /**
  * Defines a data bean used and managed by Sandesha2.
  */
@@ -26,7 +28,7 @@
 public abstract class RMBean implements Serializable {
 		
 		private long id;
-		
+		private volatile Transaction transaction;
 		
 		public long getId() {
 			return id;
@@ -35,4 +37,11 @@
 		protected void setId(long id) {
 			this.id = id;
 		}
-}
\ No newline at end of file
+		public Transaction getTransaction() {
+			return transaction;
+		}
+
+		public void setTransaction(Transaction transaction) {
+			this.transaction = transaction;
+		}
+}

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java?view=auto&rev=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java Thu Nov 23 02:16:56 2006
@@ -0,0 +1,148 @@
+package org.apache.sandesha2.storage.inmemory;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.axis2.context.AbstractContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.beans.RMBean;
+
+abstract class InMemoryBeanMgr {
+
+	private static final Log log = LogFactory.getLog(InMemoryBeanMgr.class);
+	private Hashtable table;
+	private InMemoryStorageManager mgr;
+
+	protected InMemoryBeanMgr(InMemoryStorageManager mgr, AbstractContext context, String key) {
+		this.mgr = mgr;
+		Object obj = context.getProperty(key);
+		if (obj != null) {
+			table = (Hashtable) obj;
+		} else {
+			table = new Hashtable();
+			context.setProperty(key, table);
+		}
+	}
+	
+	protected boolean insert(Object key, RMBean bean) {
+		mgr.enlistBean(bean);
+		synchronized (table) {
+			table.put(key, bean);
+		}
+		return true;
+	}
+
+	protected boolean delete(Object key) {
+		RMBean bean = null;
+		synchronized (table) {
+			bean = (RMBean) table.get(key);
+		}
+		if(bean != null) {
+			mgr.enlistBean(bean);
+			synchronized (table) {
+				bean = (RMBean) table.remove(key);
+			}
+		}
+		return bean != null;
+	}
+
+	protected RMBean retrieve(Object key) {
+		RMBean bean = null;
+		synchronized (table) {
+			bean = (RMBean) table.get(key);
+		}
+		if(bean != null) {
+			mgr.enlistBean(bean);
+			synchronized (table) {
+				bean = (RMBean) table.get(key);
+			}
+		}
+		return bean;
+	}
+
+	protected boolean update(Object key, RMBean bean) {
+		mgr.enlistBean(bean);
+		RMBean oldBean = null;
+		synchronized (table) {
+			oldBean = (RMBean) table.get(key);
+			table.put(key, bean);
+		}
+		if(oldBean == null) return false;
+		
+		mgr.enlistBean(oldBean);
+		return true;
+	}
+
+	protected List find(RMBean matchInfo) {
+		ArrayList beans = new ArrayList();
+		synchronized (table) {
+			if(matchInfo == null) {
+				beans.addAll(table.values());
+			} else {
+				Iterator i = table.values().iterator();
+				while(i.hasNext()) {
+					RMBean candidate = (RMBean)i.next();
+					if(match(matchInfo, candidate)) {
+						beans.add(candidate);
+					}
+				}
+			}
+		}
+		
+		// Now we have a point-in-time view of the beans, lock them all
+		Iterator i = beans.iterator();
+		while(i.hasNext()) mgr.enlistBean((RMBean) i.next());
+		
+		// Finally remove any beans that are no longer in the table
+		synchronized (table) {
+			i = beans.iterator();
+			while(i.hasNext()) {
+				RMBean bean = (RMBean) i.next();
+				if(!table.containsValue(bean)) {
+					i.remove();
+				}
+			}
+		}
+
+		return beans;
+	}
+
+	protected RMBean findUnique (RMBean matchInfo) throws SandeshaException {
+		RMBean result = null;
+		synchronized (table) {
+			Iterator i = table.values().iterator();
+			while(i.hasNext()) {
+				RMBean candidate = (RMBean)i.next();
+				if(match(matchInfo, candidate)) {
+					if(result == null) {
+						result = candidate;
+					} else {
+						String message = SandeshaMessageHelper.getMessage(
+								SandeshaMessageKeys.nonUniqueResult);
+						log.error(message);
+						throw new SandeshaException (message);
+					}
+				}
+			}
+		}
+		
+		// Now we have a point-in-time view of the bean, lock it, and double
+		// check that it is still in the table 
+		if(result != null) {
+			mgr.enlistBean(result);
+			synchronized (table) {
+				if(!table.containsValue(result)) result = null;
+			}
+		}
+		
+		return result;
+	}
+
+	protected abstract boolean match(RMBean matchInfo, RMBean bean);
+}

Propchange: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java Thu Nov 23 02:16:56 2006
@@ -18,10 +18,6 @@
 package org.apache.sandesha2.storage.inmemory;
 
 import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.axis2.context.AbstractContext;
@@ -33,96 +29,61 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.RMBean;
 
-public class InMemoryCreateSeqBeanMgr implements CreateSeqBeanMgr {
+public class InMemoryCreateSeqBeanMgr extends InMemoryBeanMgr implements CreateSeqBeanMgr {
 
 	private static final Log log = LogFactory.getLog(InMemoryCreateSeqBeanMgr.class);
-	private Hashtable table = null;
-	
 
-	public InMemoryCreateSeqBeanMgr(AbstractContext context) {
-		Object obj = context.getProperty(Sandesha2Constants.BeanMAPs.CREATE_SEQUECE);
-		if (obj != null) {
-			table = (Hashtable) obj;
-		} else {
-			table = new Hashtable();
-			context.setProperty(Sandesha2Constants.BeanMAPs.CREATE_SEQUECE, table);
-		}
+	public InMemoryCreateSeqBeanMgr(InMemoryStorageManager mgr, AbstractContext context) {
+		super(mgr, context, Sandesha2Constants.BeanMAPs.CREATE_SEQUECE);
 	}
 
-	public synchronized boolean insert(CreateSeqBean bean) {
-		table.put(bean.getCreateSeqMsgID(), bean);
-		return true;
+	public boolean insert(CreateSeqBean bean) {
+		return super.insert(bean.getCreateSeqMsgID(), bean);
 	}
 
-	public synchronized boolean delete(String msgId) {
-		return table.remove(msgId) != null;
+	public boolean delete(String msgId) {
+		return super.delete(msgId);
 	}
 
-	public synchronized CreateSeqBean retrieve(String msgId) {
-		return (CreateSeqBean) table.get(msgId);
+	public CreateSeqBean retrieve(String msgId) {
+		return (CreateSeqBean) super.retrieve(msgId);
 	}
 
-	public synchronized boolean update(CreateSeqBean bean) {
-		if (table.get(bean.getCreateSeqMsgID())==null)
-			return false;
-
-		return table.put(bean.getCreateSeqMsgID(), bean) != null;
+	public boolean update(CreateSeqBean bean) {
+		return super.update(bean.getCreateSeqMsgID(), bean);
 	}
 
-	public synchronized List find(CreateSeqBean bean) {
-		ArrayList beans = new ArrayList();
-		if (bean == null)
-			return beans;
-
-		Iterator iterator = table.values().iterator();
-
-		CreateSeqBean temp;
-		while (iterator.hasNext()) {
-			temp = (CreateSeqBean) iterator.next();
-
-			boolean equal = true;
-
-			if (bean.getCreateSeqMsgID() != null
-					&& !bean.getCreateSeqMsgID().equals(
-							temp.getCreateSeqMsgID()))
-				equal = false;
+	public List find(CreateSeqBean bean) {
+		return super.find(bean);
+	}
+	
+	protected boolean match(RMBean matchInfo, RMBean candidate) {
+		boolean equal = true;
+		
+		CreateSeqBean bean = (CreateSeqBean) matchInfo;
+		CreateSeqBean temp = (CreateSeqBean) candidate;
 
-			if (bean.getSequenceID() != null
-					&& !bean.getSequenceID().equals(temp.getSequenceID()))
-				equal = false;
+		if (bean.getCreateSeqMsgID() != null
+				&& !bean.getCreateSeqMsgID().equals(
+						temp.getCreateSeqMsgID()))
+			equal = false;
 
-			if (bean.getInternalSequenceID() != null
-					&& !bean.getInternalSequenceID().equals(
-							temp.getInternalSequenceID()))
-				equal = false;
+		if (bean.getSequenceID() != null
+				&& !bean.getSequenceID().equals(temp.getSequenceID()))
+			equal = false;
 
-			if (equal)
-				beans.add(temp);
+		if (bean.getInternalSequenceID() != null
+				&& !bean.getInternalSequenceID().equals(
+						temp.getInternalSequenceID()))
+			equal = false;
 
-		}
-		return beans;
+		return equal;
 	}
 
-	public synchronized ResultSet find(String query) {
-		throw new UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-				SandeshaMessageKeys.selectRSNotSupported));
-	}
-	
-	public synchronized CreateSeqBean findUnique (CreateSeqBean bean) throws SandeshaException {
-		Collection coll = find(bean);
-		if (coll.size()>1) {
-			String message = SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.nonUniqueResult);
-			log.error(message);
-			throw new SandeshaException (message);
-		}
-		
-		Iterator iter = coll.iterator();
-		if (iter.hasNext())
-			return (CreateSeqBean) iter.next();
-		else 
-			return null;
+	public CreateSeqBean findUnique (CreateSeqBean bean) throws SandeshaException {
+		return (CreateSeqBean) super.findUnique(bean);
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java Thu Nov 23 02:16:56 2006
@@ -33,89 +33,60 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beans.InvokerBean;
+import org.apache.sandesha2.storage.beans.RMBean;
 
-public class InMemoryInvokerBeanMgr implements InvokerBeanMgr {
+public class InMemoryInvokerBeanMgr extends InMemoryBeanMgr implements InvokerBeanMgr {
 	
 	private static final Log log = LogFactory.getLog(InMemoryInvokerBeanMgr.class);
-	private Hashtable table = null;
 
-	public InMemoryInvokerBeanMgr(AbstractContext context) {
-		Object obj = context.getProperty(Sandesha2Constants.BeanMAPs.STORAGE_MAP);
-		if (obj != null) {
-			table = (Hashtable) obj;
-		} else {
-			table = new Hashtable();
-			context.setProperty(Sandesha2Constants.BeanMAPs.STORAGE_MAP, table);
-		}
+	public InMemoryInvokerBeanMgr(InMemoryStorageManager mgr, AbstractContext context) {
+		super(mgr, context, Sandesha2Constants.BeanMAPs.STORAGE_MAP);
 	}
 
-	public synchronized boolean insert(InvokerBean bean) {
-		table.put(bean.getMessageContextRefKey(), bean);
-		return true;
+	public boolean insert(InvokerBean bean) {
+		return super.insert(bean.getMessageContextRefKey(), bean);
 	}
 
-	public synchronized boolean delete(String key) {
-		return table.remove(key) != null;
+	public boolean delete(String key) {
+		return super.delete(key);
 	}
 
-	public synchronized InvokerBean retrieve(String key) {
-		return (InvokerBean) table.get(key);
+	public InvokerBean retrieve(String key) {
+		return (InvokerBean) super.retrieve(key);
 	}
 
-	public synchronized ResultSet find(String query) {
-		throw new UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-				SandeshaMessageKeys.selectRSNotSupported));
+	public List find(InvokerBean bean) {
+		return super.find(bean);
 	}
+	
+	protected boolean match(RMBean matchInfo, RMBean candidate) {
+		InvokerBean bean = (InvokerBean) matchInfo;
+		InvokerBean temp = (InvokerBean) candidate;
 
-	public synchronized List find(InvokerBean bean) {
-		ArrayList beans = new ArrayList();
-		Iterator iterator = table.values().iterator();
-
-		InvokerBean temp = new InvokerBean();
-		while (iterator.hasNext()) {
-			temp = (InvokerBean) iterator.next();
-			boolean select = true;
-
-			if (bean.getMessageContextRefKey() != null && !bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey()))
-				select = false;
+		boolean select = true;
 
-			if (bean.getMsgNo() != 0 && bean.getMsgNo() != temp.getMsgNo())
-				select = false;
+		if (bean.getMessageContextRefKey() != null && !bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey()))
+			select = false;
 
-			if (bean.getSequenceID() != null
-					&& !bean.getSequenceID().equals(temp.getSequenceID()))
-				select = false;
-			
-			if (bean.isInvoked()!=temp.isInvoked())
-				select = false;
+		if (bean.getMsgNo() != 0 && bean.getMsgNo() != temp.getMsgNo())
+			select = false;
 
-			if (select)
-				beans.add(temp);
-		}
-		return beans;
+		if (bean.getSequenceID() != null
+				&& !bean.getSequenceID().equals(temp.getSequenceID()))
+			select = false;
+		
+		if (bean.isInvoked()!=temp.isInvoked())
+			select = false;
+		
+		return select;
 	}
 
-	public synchronized boolean update(InvokerBean bean) {
-		if (table.get(bean.getMessageContextRefKey())==null)
-			return false;
-
-		return table.put(bean.getMessageContextRefKey(), bean) != null;
+	public boolean update(InvokerBean bean) {
+		return super.update(bean.getMessageContextRefKey(), bean);
 	}
 	
-	public synchronized InvokerBean findUnique (InvokerBean bean) throws SandeshaException {
-		Collection coll = find(bean);
-		if (coll.size()>1) {
-			String message = SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.nonUniqueResult);
-			log.error(message);
-			throw new SandeshaException (message);
-		}
-		
-		Iterator iter = coll.iterator();
-		if (iter.hasNext())
-			return (InvokerBean) iter.next();
-		else 
-			return null;
+	public InvokerBean findUnique(InvokerBean bean) throws SandeshaException {
+		return (InvokerBean) super.findUnique(bean);
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java Thu Nov 23 02:16:56 2006
@@ -33,94 +33,59 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.RMBean;
 
-public class InMemoryNextMsgBeanMgr implements NextMsgBeanMgr {
+public class InMemoryNextMsgBeanMgr extends InMemoryBeanMgr implements NextMsgBeanMgr {
 
 	private static final Log log = LogFactory.getLog(InMemoryNextMsgBeanMgr.class);
-	private Hashtable table = null;
 
-	public InMemoryNextMsgBeanMgr(AbstractContext context) {
-		Object obj = context.getProperty(Sandesha2Constants.BeanMAPs.NEXT_MESSAGE);
-
-		if (obj != null) {
-			table = (Hashtable) obj;
-		} else {
-			table = new Hashtable();
-			context.setProperty(Sandesha2Constants.BeanMAPs.NEXT_MESSAGE, table);
-		}
+	public InMemoryNextMsgBeanMgr(InMemoryStorageManager mgr, AbstractContext context) {
+		super(mgr, context, Sandesha2Constants.BeanMAPs.NEXT_MESSAGE);
 	}
 
-	public synchronized boolean delete(String sequenceId) {
-		return table.remove(sequenceId) != null;
+	public boolean delete(String sequenceId) {
+		return super.delete(sequenceId);
 	}
 
-	public synchronized NextMsgBean retrieve(String sequenceId) {
-		return (NextMsgBean) table.get(sequenceId);
+	public NextMsgBean retrieve(String sequenceId) {
+		return (NextMsgBean) super.retrieve(sequenceId);
 	}
 
-	public synchronized boolean insert(NextMsgBean bean) {
-		table.put(bean.getSequenceID(), bean);
-		return true;
+	public boolean insert(NextMsgBean bean) {
+		return super.insert(bean.getSequenceID(), bean);
 	}
 
-	public synchronized ResultSet find(String query) {
-		throw new UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-				SandeshaMessageKeys.selectRSNotSupported));
+	public List find(NextMsgBean bean) {
+		return super.find(bean);
 	}
+	
+	protected boolean match(RMBean matchInfo, RMBean candidate) {
+		NextMsgBean bean = (NextMsgBean) matchInfo;
+		NextMsgBean temp = (NextMsgBean) candidate;
 
-	public synchronized List find(NextMsgBean bean) {
-		ArrayList beans = new ArrayList();
-		Iterator iterator = table.values().iterator();
-
-		if (bean == null)
-			return beans;
-
-		NextMsgBean temp;
-		while (iterator.hasNext()) {
-			temp = (NextMsgBean) iterator.next();
-
-			boolean equal = true;
-
-			if (bean.getNextMsgNoToProcess() > 0
-					&& bean.getNextMsgNoToProcess() != temp
-							.getNextMsgNoToProcess())
-				equal = false;
+		boolean equal = true;
 
-			if (bean.getSequenceID() != null
-					&& !bean.getSequenceID().equals(temp.getSequenceID()))
-				equal = false;
+		if (bean.getNextMsgNoToProcess() > 0
+				&& bean.getNextMsgNoToProcess() != temp
+						.getNextMsgNoToProcess())
+			equal = false;
 
-			if (equal)
-				beans.add(temp);
+		if (bean.getSequenceID() != null
+				&& !bean.getSequenceID().equals(temp.getSequenceID()))
+			equal = false;
 
-		}
-		return beans;
+		return equal;
 	}
 
-	public synchronized boolean update(NextMsgBean bean) {
-		if (table.get(bean.getSequenceID())==null)
-			return false;
-
-		return table.put(bean.getSequenceID(), bean) != null;
+	public boolean update(NextMsgBean bean) {
+		return super.update(bean.getSequenceID(), bean);
 	}
 
-	public synchronized Collection retrieveAll() {
-		return table.values();
+	public Collection retrieveAll() {
+		return super.find(null);
 	}
 	
-	public synchronized NextMsgBean findUnique(NextMsgBean bean) throws SandeshaException {
-		Collection coll = find(bean);
-		if (coll.size()>1) {
-			String message = SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.nonUniqueResult);
-			log.error(message);
-			throw new SandeshaException (message);
-		}
-		
-		Iterator iter = coll.iterator();
-		if (iter.hasNext())
-			return (NextMsgBean) iter.next();
-		else 
-			return null;
+	public NextMsgBean findUnique(NextMsgBean bean) throws SandeshaException {
+		return (NextMsgBean) super.findUnique(bean);
 	}
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Thu Nov 23 02:16:56 2006
@@ -17,9 +17,6 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 
@@ -31,218 +28,144 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
-import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.SandeshaUtil;
 
-public class InMemorySenderBeanMgr implements SenderBeanMgr {
+public class InMemorySenderBeanMgr extends InMemoryBeanMgr implements SenderBeanMgr {
 	
 	private static final Log log = LogFactory.getLog(InMemorySenderBeanMgr.class);
-	private Hashtable table = null;
 
-	public InMemorySenderBeanMgr(AbstractContext context) {
-		Object obj = context.getProperty(Sandesha2Constants.BeanMAPs.RETRANSMITTER);
-		if (obj != null) {
-			table = (Hashtable) obj;
-		} else {
-			table = new Hashtable();
-			context.setProperty(Sandesha2Constants.BeanMAPs.RETRANSMITTER, table);
-		}
+	public InMemorySenderBeanMgr(InMemoryStorageManager mgr, AbstractContext context) {
+		super(mgr, context, Sandesha2Constants.BeanMAPs.RETRANSMITTER);
 	}
 
-	public synchronized boolean delete(String MessageId) {
-		return table.remove(MessageId) != null;
+	public boolean delete(String MessageId) {
+		return super.delete(MessageId);
 	}
 
-	public synchronized SenderBean retrieve(String MessageId) {
-		return (SenderBean) table.get(MessageId);
+	public SenderBean retrieve(String MessageId) {
+		return (SenderBean) super.retrieve(MessageId);
 	}
 
-	public synchronized boolean insert(SenderBean bean) throws SandeshaStorageException {
+	public boolean insert(SenderBean bean) throws SandeshaStorageException {
 		if (bean.getMessageID() == null)
 			throw new SandeshaStorageException(SandeshaMessageHelper.getMessage(
 					SandeshaMessageKeys.nullMsgId));
-		table.put(bean.getMessageID(), bean);
-		return true;
+		return super.insert(bean.getMessageID(), bean);
 	}
 
-	public synchronized List find(String internalSequenceID) {
+	public List find(String internalSequenceID) {
+		SenderBean temp = new SenderBean();
+		temp.setInternalSequenceID(internalSequenceID);
+		return super.find(temp);
+	}
+	
+	protected boolean match(RMBean matchInfo, RMBean candidate) {
+		if(log.isDebugEnabled()) log.debug("Entry: InMemorySenderBeanMgr::match");
+		SenderBean bean = (SenderBean)matchInfo;
+		SenderBean temp = (SenderBean) candidate;
 		
-		ArrayList arrayList = new ArrayList ();
-		if (internalSequenceID==null || "".equals(internalSequenceID))
-			return arrayList;
+		boolean add = true;
+
+		if (bean.getMessageContextRefKey() != null && !bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey())) {
+			log.debug("MessageContextRefKey didn't match");
+			add = false;
+		}
+		// Time is a bit special - we match all the beans that should be sent
+		// before the moment in time that the match criteria give us.
+		if (bean.getTimeToSend() > 0
+				&& bean.getTimeToSend() < temp.getTimeToSend()) {
+			log.debug("MessageContextRefKey didn't match");
+			add = false;
+		}
 		
-		Iterator iterator = table.keySet().iterator();
+		if (bean.getMessageID() != null
+				&& !bean.getMessageID().equals(temp.getMessageID())) {
+			log.debug("MessageID didn't match");
+			add = false;
+		}
 		
-		while (iterator.hasNext()) {
-			SenderBean senderBean = (SenderBean) table.get(iterator.next());
-			if (internalSequenceID.equals(senderBean.getInternalSequenceID())) 
-					arrayList.add(senderBean);
+		if (bean.getInternalSequenceID() != null
+				&& !bean.getInternalSequenceID().equals("")
+				&& !bean.getInternalSequenceID().equals(
+						temp.getInternalSequenceID())) {
+			log.debug("InternalSequenceID didn't match");
+			add = false;
 		}
 		
-		return arrayList;
-	}
-
-	public synchronized List find(SenderBean bean) {
-		ArrayList beans = new ArrayList();
-		Iterator iterator = ((Hashtable) table).values().iterator();
-
-		SenderBean temp;
-		while (iterator.hasNext()) {
-
-			temp = (SenderBean) iterator.next();
-
-			
-			boolean add = true;
-
-			if (bean.getMessageContextRefKey() != null && !bean.getMessageContextRefKey().equals(temp.getMessageContextRefKey()))
-				add = false;
-
-			if (bean.getTimeToSend() > 0
-					&& bean.getTimeToSend() != temp.getTimeToSend())
-				add = false;
-
-			if (bean.getMessageID() != null
-					&& !bean.getMessageID().equals(temp.getMessageID()))
-				add = false;
-
-			if (bean.getInternalSequenceID() != null
-					&& !bean.getInternalSequenceID().equals(
-							temp.getInternalSequenceID()))
-				add = false;
-
-			if (bean.getMessageNumber() > 0
-					&& bean.getMessageNumber() != temp.getMessageNumber())
-				add = false;
-
-			if (bean.getMessageType() != Sandesha2Constants.MessageTypes.UNKNOWN
-					&& bean.getMessageType() != temp.getMessageType())
-				add = false;
-			
-			if (bean.isSend() != temp.isSend())
-				add = false;
+		if (bean.getMessageNumber() > 0
+				&& bean.getMessageNumber() != temp.getMessageNumber()) {
+			log.debug("MessageNumber didn't match");
+			add = false;
+		}
+		
+		if (bean.getMessageType() != Sandesha2Constants.MessageTypes.UNKNOWN
+				&& bean.getMessageType() != temp.getMessageType()) {
+			log.debug("MessageType didn't match");
+			add = false;
+		}
 
-			if (bean.isReSend() != temp.isReSend())
-				add = false;
-			
-			if (add)
-				beans.add(temp);
+		if (bean.isSend() != temp.isSend()) {
+			log.debug("isSend didn't match");
+			add = false;
 		}
 
-		return beans;
+		// Do not use the isReSend flag to match messages, as it can stop us from
+		// detecting RM messages during 'getNextMsgToSend'
+		//if (bean.isReSend() != temp.isReSend()) {
+		//	log.debug("isReSend didn't match");
+		//	add = false;
+		//}
+
+		if(log.isDebugEnabled()) log.debug("Exit: InMemorySenderBeanMgr::match, " + add);
+		return add;
 	}
 
-	public synchronized SenderBean getNextMsgToSend() {
-		
-		Iterator iterator = table.keySet().iterator();
+	public List find(SenderBean bean) {
+		return super.find(bean);
+	}
 
-		//TODO
-		//pick a random sequence out of the sequences to be sent.
-		
+	public SenderBean getNextMsgToSend() {
+		// Set up match criteria
+		SenderBean matcher = new SenderBean();
+		matcher.setSend(true);
+		matcher.setTimeToSend(System.currentTimeMillis());
 		
-		long lowestAppMsgNo = 0;
-		while (iterator.hasNext()) {
-			Object key = iterator.next();
-			SenderBean temp = (SenderBean) table.get(key);
-			if (temp.isSend()) {
-				long timeToSend = temp.getTimeToSend();
-				long timeNow = System.currentTimeMillis();
-				if ((timeNow >= timeToSend)) {
-					if (temp.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
-						long msgNo = temp.getMessageNumber();
-						if (lowestAppMsgNo==0) {
-							lowestAppMsgNo=msgNo;
-						}else {
-							if (msgNo<lowestAppMsgNo)
-								lowestAppMsgNo = msgNo;
-						}
-					}
-				}
-			}
-		}
+		List matches = super.find(matcher);
 		
-		iterator = table.keySet().iterator();	
-		while (iterator.hasNext()) {
-			Object key = iterator.next();
-			SenderBean temp = (SenderBean) table.get(key);
-
-			if (temp.isSend()) {
-
-				long timeToSend = temp.getTimeToSend();
-				long timeNow = System.currentTimeMillis();
-				if ((timeNow >= timeToSend)) {
-					boolean valid = false;
-					if (temp.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
-						if (temp.getMessageNumber()==lowestAppMsgNo)
-							valid = true;
-					}else {
-						valid = true;
-					}
-					
-					if (valid) {
-					    updateNextSendingTime (temp);
-					    return temp;
-					}
+		// We either return an application message or an RM message. If we find
+		// an application message first then we carry on through the list to be
+		// sure that we send the lowest app message avaliable. If we hit a RM
+		// message first then we are done.
+		SenderBean result = null;
+		Iterator i = matches.iterator();
+		while(i.hasNext()) {
+			SenderBean bean = (SenderBean) i.next();
+			if(bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+				long number = bean.getMessageNumber();
+				if(result == null || result.getMessageNumber() > number) {
+					result = bean;
 				}
+			} else if(result == null) {
+				result = bean;
+				break;
 			}
 		}
 		
-		return null;
+		return result;
 	}
-
-	private void updateNextSendingTime (SenderBean bean) {
-		
+	
+	public boolean update(SenderBean bean) {
+		return super.update(bean.getMessageID(), bean);
 	}
 	
-	private synchronized ArrayList findBeansWithMsgNo(ArrayList list, long msgNo) {
-		ArrayList beans = new ArrayList();
-
-		Iterator it = list.iterator();
-		while (it.hasNext()) {
-			SenderBean bean = (SenderBean) it.next();
-			if (bean.getMessageNumber() == msgNo)
-				beans.add(bean);
-		}
-
-		return beans;
+	public SenderBean findUnique(SenderBean bean) throws SandeshaException {
+		return (SenderBean) super.findUnique(bean);
 	}
 
-	public synchronized boolean update(SenderBean bean) {
-		if (table.get(bean.getMessageID())==null)
-			return false;
-
-		return true; //No need to update. Being a reference does the job.
-	}
-	
-	public synchronized SenderBean findUnique(SenderBean bean) throws SandeshaException {
-		Collection coll = find(bean);
-		if (coll.size()>1) {
-			String message = SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.nonUniqueResult);
-			log.error(message);
-			throw new SandeshaException (message);
-		}
-		
-		Iterator iter = coll.iterator();
-		if (iter.hasNext())
-			return (SenderBean) iter.next();
-		else 
-			return null;
-	}
-
-	public synchronized SenderBean retrieveFromMessageRefKey(String messageContextRefKey) {
-		
-		Iterator iter = table.keySet().iterator();
-		while (iter.hasNext()) {
-			Object key = iter.next();
-			SenderBean bean = (SenderBean) table.get(key);
-			if (bean.getMessageContextRefKey().equals(messageContextRefKey)) {
-				return bean;
-			}
-		}
-		
-		return null;
+	public SenderBean retrieveFromMessageRefKey(String messageContextRefKey) {
+		throw new UnsupportedOperationException("Deprecated method");
 	}
 	
 	

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java Thu Nov 23 02:16:56 2006
@@ -17,11 +17,7 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
-import java.sql.ResultSet;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.axis2.context.AbstractContext;
@@ -29,129 +25,76 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 
-public class InMemorySequencePropertyBeanMgr implements SequencePropertyBeanMgr {
+public class InMemorySequencePropertyBeanMgr extends InMemoryBeanMgr implements SequencePropertyBeanMgr {
 	
 	private static final Log log = LogFactory.getLog(InMemorySequencePropertyBeanMgr.class);
-	private Hashtable table = null;
 
-	public InMemorySequencePropertyBeanMgr(AbstractContext context) {
-		Object obj = context.getProperty(Sandesha2Constants.BeanMAPs.SEQUENCE_PROPERTY);
-		if (obj != null) {
-			table = (Hashtable) obj;
-		} else {
-			table = new Hashtable();
-			context.setProperty(Sandesha2Constants.BeanMAPs.SEQUENCE_PROPERTY, table);
-		}
+	public InMemorySequencePropertyBeanMgr(InMemoryStorageManager mgr, AbstractContext context) {
+		super(mgr, context, Sandesha2Constants.BeanMAPs.SEQUENCE_PROPERTY);
 	}
 
-	public synchronized boolean delete(String sequenceId, String name) {
-		
-		SequencePropertyBean bean = retrieve( sequenceId,name);
-				
-		return table.remove(sequenceId + ":" + name) != null;
+	public boolean delete(String sequenceId, String name) {
+		return super.delete(getId(sequenceId, name));
 	}
 
-	public synchronized SequencePropertyBean retrieve(String sequenceId, String name) {
-		return (SequencePropertyBean) table.get(sequenceId + ":" + name);
+	public SequencePropertyBean retrieve(String sequenceId, String name) {
+		return (SequencePropertyBean) super.retrieve(getId(sequenceId, name));
 	}
 
-	public synchronized boolean insert(SequencePropertyBean bean) {
-		table.put(bean.getSequencePropertyKey() + ":" + bean.getName(), bean);
-		return true;
+	public boolean insert(SequencePropertyBean bean) {
+		return super.insert(getId(bean), bean);
 	}
 
-	public synchronized ResultSet find(String query) {
-		throw new UnsupportedOperationException(SandeshaMessageHelper.getMessage(
-				SandeshaMessageKeys.selectRSNotSupported));
+	public List find(SequencePropertyBean bean) {
+		return super.find(bean);
 	}
+	
+	protected boolean match(RMBean matchInfo, RMBean candidate) {
+		SequencePropertyBean bean = (SequencePropertyBean) matchInfo;
+		SequencePropertyBean temp = (SequencePropertyBean) candidate;
 
-	public synchronized List find(SequencePropertyBean bean) {
-		ArrayList beans = new ArrayList();
-
-		if (bean == null)
-			return beans;
-
-		Iterator iterator = table.values().iterator();
-		SequencePropertyBean temp;
-
-		while (iterator.hasNext()) {
-			temp = (SequencePropertyBean) iterator.next();
-
-			boolean equal = true;
-
-			if (bean.getSequencePropertyKey() != null
-					&& !bean.getSequencePropertyKey().equals(temp.getSequencePropertyKey()))
-				equal = false;
+		boolean equal = true;
 
-			if (bean.getName() != null
-					&& !bean.getName().equals(temp.getName()))
-				equal = false;
+		if (bean.getSequencePropertyKey() != null
+				&& !bean.getSequencePropertyKey().equals(temp.getSequencePropertyKey()))
+			equal = false;
 
-			if (bean.getValue() != null
-					&& !bean.getValue().equals(temp.getValue()))
-				equal = false;
+		if (bean.getName() != null
+				&& !bean.getName().equals(temp.getName()))
+			equal = false;
 
-			if (equal)
-				beans.add(temp);
+		if (bean.getValue() != null
+				&& !bean.getValue().equals(temp.getValue()))
+			equal = false;
 
-		}
-		return beans;
+		return equal;
 	}
 
-	public synchronized boolean update(SequencePropertyBean bean) {	
-		
-		if (table.get(getId(bean))==null)
-			return false;
-
-		return table.put(getId(bean), bean) != null;
-
+	public boolean update(SequencePropertyBean bean) {	
+		return super.update(getId(bean), bean);
 	}
 	
-	public synchronized boolean updateOrInsert(SequencePropertyBean bean) {	
-		
-		if (table.get(getId(bean))==null)
-			table.put(getId(bean), bean);
-
-		return table.put(getId(bean), bean) != null;
-
+	public boolean updateOrInsert(SequencePropertyBean bean) {	
+		throw new UnsupportedOperationException("Deprecated method");
 	}
 
 	private String getId(SequencePropertyBean bean) {
 		return bean.getSequencePropertyKey() + ":" + bean.getName();
 	}
+	private String getId(String sequenceId, String name) {
+		return sequenceId + ":" + name;
+	}
 	
-	public synchronized SequencePropertyBean findUnique(SequencePropertyBean bean) throws SandeshaException {
-		Collection coll = find(bean);
-		if (coll.size()>1) {
-			String message = SandeshaMessageHelper.getMessage(
-					SandeshaMessageKeys.nonUniqueResult);
-			log.error(message);
-			throw new SandeshaException (message);
-		}
-		
-		Iterator iter = coll.iterator();
-		if (iter.hasNext())
-			return (SequencePropertyBean) iter.next();
-		else 
-			return null;
-	}
-
-	public synchronized Collection retrieveAll() {
-		Collection coll = new ArrayList();
-		
-		Iterator keys = table.keySet().iterator();
-		while (keys.hasNext()) {
-			Object key = keys.next();
-			coll.add(table.get(key));
-		}
-		
-		return coll;
+	public SequencePropertyBean findUnique(SequencePropertyBean bean) throws SandeshaException {
+		return (SequencePropertyBean) super.findUnique(bean);
 	}
 
+	public Collection retrieveAll() {
+		return super.find(null);
+	}
 	
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Thu Nov 23 02:16:56 2006
@@ -17,12 +17,15 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
+import java.util.Collection;
 import java.util.HashMap;
 
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisModule;
 import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
@@ -33,10 +36,13 @@
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.util.SandeshaUtil;
 
 public class InMemoryStorageManager extends StorageManager {
 
+	private static Log log = LogFactory.getLog(InMemoryStorageManager.class);
+
 	private static InMemoryStorageManager instance = null;
     private final String MESSAGE_MAP_KEY = "Sandesha2MessageMap";
     private CreateSeqBeanMgr  createSeqBeanMgr = null;
@@ -44,21 +50,44 @@
     private SequencePropertyBeanMgr sequencePropertyBeanMgr = null;
     private SenderBeanMgr senderBeanMgr = null;
     private InvokerBeanMgr invokerBeanMgr = null;
+    private HashMap transactions = new HashMap();
     
 	public InMemoryStorageManager(ConfigurationContext context) {
 		super(context);
 		
-		this.createSeqBeanMgr = new InMemoryCreateSeqBeanMgr (context);
-		this.nextMsgBeanMgr = new InMemoryNextMsgBeanMgr (context);
-		this.senderBeanMgr = new InMemorySenderBeanMgr (context);
-		this.invokerBeanMgr = new InMemoryInvokerBeanMgr (context);
-		this.sequencePropertyBeanMgr = new InMemorySequencePropertyBeanMgr (context);
+		this.createSeqBeanMgr = new InMemoryCreateSeqBeanMgr (this, context);
+		this.nextMsgBeanMgr = new InMemoryNextMsgBeanMgr (this, context);
+		this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
+		this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
+		this.sequencePropertyBeanMgr = new InMemorySequencePropertyBeanMgr (this, context);
 	}
 
 	public Transaction getTransaction() {
-		return new InMemoryTransaction();
+		Transaction result = null;
+		synchronized (transactions) {
+			Long key = new Long(Thread.currentThread().getId());
+			String name = Thread.currentThread().getName();
+			result = (Transaction) transactions.get(key);
+			if(result == null) {
+				result = new InMemoryTransaction(this, key, name);
+				transactions.put(key, result);
+			}
+		}
+		return result;
 	}
-
+	
+	void removeTransaction(Transaction t) {
+		synchronized (transactions) {
+			Collection entries = transactions.values();
+			entries.remove(t);
+		}
+	}
+	
+	void enlistBean(RMBean bean) {
+		InMemoryTransaction t = (InMemoryTransaction) getTransaction();
+		t.enlist(bean);
+	}
+	
 	public CreateSeqBeanMgr getCreateSeqBeanMgr() {
 		return createSeqBeanMgr;
 	}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=478523&r1=478522&r2=478523
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Thu Nov 23 02:16:56 2006
@@ -17,25 +17,107 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beans.RMBean;
 
 /**
- * This class does nothing special. Just to be consistent with the transaction
- * based behavious of other 'Persistant' StorageManager implementations.
+ * This class does not really implement transactions, but it is a good
+ * place to implement locking for the in memory storage manager.
  */
 
 public class InMemoryTransaction implements Transaction {
 
-	public void commit() {
+	private static final Log log = LogFactory.getLog(InMemoryTransaction.class);
 
+	private InMemoryStorageManager manager;
+	private Long key;
+	private String threadName;
+	private ArrayList enlistedBeans = new ArrayList();
+	
+	InMemoryTransaction(InMemoryStorageManager manager, Long key, String threadName) {
+		if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::<init>");
+		this.manager = manager;
+		this.key = key;
+		this.threadName = threadName;
+		if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::<init>, " + this);
+	}
+	
+	public void commit() {
+		releaseLocks();
 	}
 
 	public void rollback() {
-
+		releaseLocks();
 	}
 	
 	public boolean isActive () {
-		return false;
+		return !enlistedBeans.isEmpty();
 	}
 
-}
\ No newline at end of file
+	public void enlist(RMBean bean) {
+		if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::enlist, " + bean);
+		if(bean != null) {
+			synchronized (bean) {
+				Transaction other = bean.getTransaction();
+				while(other != null && other != this) {
+
+					if(!enlistedBeans.isEmpty()) {
+						Exception e = new Exception("Possible deadlock");
+						if(log.isDebugEnabled()) {
+							log.debug("Possible deadlock", e);
+							log.debug(this + ", " + bean);
+						}
+					}
+
+					try {
+						if(log.isDebugEnabled()) log.debug("This " + this + " waiting for " + other);
+						bean.wait();
+					} catch(InterruptedException e) {
+						// Do nothing
+					}
+					other = bean.getTransaction();
+				}
+				if(other == null) {
+					if(log.isDebugEnabled()) log.debug(this + " locking bean");
+					bean.setTransaction(this);
+					enlistedBeans.add(bean);
+				}
+			}
+		}
+		if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::enlist");
+	}
+	
+	private void releaseLocks() {
+		if(log.isDebugEnabled()) log.debug("Entry: InMemoryTransaction::releaseLocks, " + this);
+		manager.removeTransaction(this);
+
+		Iterator beans = enlistedBeans.iterator();
+		while(beans.hasNext()) {
+			RMBean bean = (RMBean) beans.next();
+			synchronized (bean) {
+				bean.setTransaction(null);
+				bean.notify();
+			}
+		}
+		enlistedBeans.clear();
+		
+		if(log.isDebugEnabled()) log.debug("Exit: InMemoryTransaction::releaseLocks");
+	}
+	
+	public String toString() {
+		StringBuffer result = new StringBuffer();
+		result.append("[InMemoryTransaction #");
+		result.append(key);
+		result.append(", name: ");
+		result.append(threadName);
+		result.append(", locks: ");
+		result.append(enlistedBeans.size());
+		result.append("]");
+		return result.toString();
+	}
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org