You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ju...@apache.org on 2010/11/24 14:10:32 UTC

svn commit: r1038594 - in /jackrabbit/branches/2.1: ./ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/version/ jackrabbi...

Author: jukka
Date: Wed Nov 24 13:10:32 2010
New Revision: 1038594

URL: http://svn.apache.org/viewvc?rev=1038594&view=rev
Log:
2.1: Merged revisions 995411, 995412, 999298, 999299, 999965, 1000947, 1036336 and 1036337 (JCR-2089 and JCR-2753)

Added:
    jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java
      - copied, changed from r999965, jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java
Modified:
    jackrabbit/branches/2.1/   (props changed)
    jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/TransactionContext.java
    jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java
    jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java
    jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
    jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/version/InternalVersionManagerImpl.java
    jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/TestAll.java

Propchange: jackrabbit/branches/2.1/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 24 13:10:32 2010
@@ -2,4 +2,4 @@
 /jackrabbit/sandbox/JCR-1456:774917-886178
 /jackrabbit/sandbox/JCR-2170:812417-816332
 /jackrabbit/sandbox/tripod-JCR-2209:795441-795863
-/jackrabbit/trunk:931121,931479,931483-931484,931504,931609,931613,931838,931919,932318-932319,933144,933197,933203,933213,933216,933554,933646,933694,934405,934412,934849,935557,936668,938099,945528,950440,950680,955222,955229,955307,955852,961487,961626,964362,965539,986682,986686,986715,991144,996810,1001707,1002065-1002066,1002084,1002101-1002102,1002168,1002170,1002589,1002608,1002657,1002729,1003423,1003470,1003542,1003773,1004182,1004184,1004223-1004224,1004652,1005057,1005112
+/jackrabbit/trunk:931121,931479,931483-931484,931504,931609,931613,931838,931919,932318-932319,933144,933197,933203,933213,933216,933554,933646,933694,934405,934412,934849,935557,936668,938099,945528,950440,950680,955222,955229,955307,955852,961487,961626,964362,965539,986682,986686,986715,991144,995411-995412,996810,999298-999299,999965,1000947,1001707,1002065-1002066,1002084,1002101-1002102,1002168,1002170,1002589,1002608,1002657,1002729,1003423,1003470,1003542,1003773,1004182,1004184,1004223-1004224,1004652,1005057,1005112,1036336-1036337

Modified: jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/TransactionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/TransactionContext.java?rev=1038594&r1=1038593&r2=1038594&view=diff
==============================================================================
--- jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/TransactionContext.java (original)
+++ jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/TransactionContext.java Wed Nov 24 13:10:32 2010
@@ -23,6 +23,7 @@ import org.apache.jackrabbit.util.Timer;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.Xid;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -338,4 +339,36 @@ public class TransactionContext extends 
     public static Xid getCurrentXid() {
         return CURRENT_XID.get();
     }
+
+    /**
+     * Returns the current thread identifier. The identifier is either the
+     * current thread instance or the global transaction identifier when
+     * running under a transaction.
+     *
+     * @return current thread identifier
+     */
+    public static Object getCurrentThreadId() {
+        Xid xid = TransactionContext.getCurrentXid();
+        if (xid != null) {
+            return xid.getGlobalTransactionId();
+        } else {
+            return Thread.currentThread();
+        }
+    }
+
+    /**
+     * Compares the given thread identifiers for equality.
+     *
+     * @see #getCurrentThreadId()
+     */
+    public static boolean isSameThreadId(Object a, Object b) {
+        if (a == b) {
+            return true;
+        } else if (a instanceof byte[] && b instanceof byte[]) {
+            return Arrays.equals((byte[]) a, (byte[]) b);
+        } else {
+            return false;
+        }
+    }
+
 }

Modified: jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java?rev=1038594&r1=1038593&r2=1038594&view=diff
==============================================================================
--- jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java (original)
+++ jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java Wed Nov 24 13:10:32 2010
@@ -16,210 +16,140 @@
  */
 package org.apache.jackrabbit.core.state;
 
-import java.util.Arrays;
+import static org.apache.jackrabbit.core.TransactionContext.getCurrentThreadId;
+import static org.apache.jackrabbit.core.TransactionContext.isSameThreadId;
 
-import javax.transaction.xa.Xid;
-
-import org.apache.jackrabbit.core.TransactionContext;
 import org.apache.jackrabbit.core.id.ItemId;
 
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
-
 /**
- * <code>DefaultISMLocking</code> implements the default locking strategy using
- * coarse grained locking on an ItemStateManager wide read-write lock. E.g.
- * while a write lock is held, no read lock can be acquired.
+ * Default item state locking strategy. The default strategy is simply to use
+ * a single coarse-grained read-write lock over the entire workspace.
  */
 public class DefaultISMLocking implements ISMLocking {
 
     /**
-     * The internal read-write lock.
-	 * Thread concerning ReentrantWriterPreferenceReadWriteLock
+     * The read lock instance used by readers to release the acquired lock.
      */
-    private final ReadWriteLock rwLock = new ReentrantWriterPreferenceReadWriteLock();
-    
+    private final ReadLock readLock = new ReadLock() {
+        public void release() {
+            releaseReadLock();
+        }
+    };
+
     /**
-     * The internal Xid aware read-write lock.
+     * The write lock instance used by writers to release or downgrade the
+     * acquired lock.
      */
-    private final ReadWriteLock xidRwLock = new XidRWLock();
-    
+    private final WriteLock writeLock = new WriteLock() {
+        public void release() {
+            releaseWriteLock(false);
+        }
+        public ReadLock downgrade() {
+            releaseWriteLock(true);
+            return readLock;
+        }
+    };
+
     /**
-     * {@inheritDoc}
+     * Number of writer threads waiting. While greater than zero, no new
+     * (unrelated) readers are allowed to proceed.
      */
-    public ReadLock acquireReadLock(ItemId id) throws InterruptedException {
-    	if (TransactionContext.getCurrentXid() == null) {
-            return new ReadLockImpl(rwLock.readLock());
-    	} else {
-            return new ReadLockImpl(xidRwLock.readLock());
-    	}
-    }
+    private int writersWaiting = 0;
+
+    /**
+     * The thread identifier of the current writer, or <code>null</code> if
+     * no write is in progress. A thread with the same identifier (i.e. the
+     * same thread or another thread in the same transaction) can re-acquire
+     * read or write locks without limitation, while all other readers and
+     * writers remain blocked. Note that a downgraded write lock still retains
+     * the writer thread identifier, which allows related threads to reacquire
+     * read or write locks even when there are concurrent writers waiting.
+     */
+    private Object writerId = null;
+
+    /**
+     * Number of acquired write locks. All the concurrent write locks are
+     * guaranteed to share the same thread identifier (see {@link #writerId}).
+     */
+    private int writerCount = 0;
+
+    /**
+     * Number of acquired read locks.
+     */
+    private int readerCount = 0;
 
     /**
-     * {@inheritDoc}
+     * Increments the reader count and returns the acquired read lock once
+     * there are no more writers or the current writer shares the thread id
+     * with this reader.
      */
-    public WriteLock acquireWriteLock(ChangeLog changeLog) throws InterruptedException {
-    	if (TransactionContext.getCurrentXid() == null) {
-    		return new WriteLockImpl(rwLock);
-    	} else {
-    		return new WriteLockImpl(xidRwLock);
-    	}
+    public synchronized ReadLock acquireReadLock(ItemId id)
+            throws InterruptedException {
+        Object currentId = getCurrentThreadId();
+        while (writerId != null
+                ? (writerCount > 0 && !isSameThreadId(writerId, currentId))
+                : writersWaiting > 0) {
+            wait();
+        }
+
+        readerCount++;
+        return readLock;
     }
 
-    private static final class WriteLockImpl implements WriteLock {
-    	
-    	private ReadWriteLock readWriteLock;
-    	
-    	private WriteLockImpl(ReadWriteLock readWriteLock) throws InterruptedException {
-    		this.readWriteLock = readWriteLock;
-    		this.readWriteLock.writeLock().acquire();
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		public void release() {
-		    this.readWriteLock.writeLock().release();
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		public ReadLock downgrade() throws InterruptedException {
-		    ReadLock rLock = new ReadLockImpl(this.readWriteLock.readLock());
-		    release();
-		    return rLock;
-		}
-	}
-
-	private static final class ReadLockImpl implements ReadLock {
-
-        private final Sync readLock;
-
-        private ReadLockImpl(Sync readLock) throws InterruptedException {
-            this.readLock = readLock;
-            this.readLock.acquire();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        public void release() {
-            readLock.release();
+    /**
+     * Decrements the reader count and notifies all pending threads if the
+     * lock is now available. Used by the {@link #readLock} instance.
+     */
+    private synchronized void releaseReadLock() {
+        readerCount--;
+        if (readerCount == 0 && writerCount == 0) {
+            writerId = null;
+            notifyAll();
         }
     }
 
-	/**
-	 * Xid concerning ReentrantWriterPreferenceReadWriteLock
-	 */
-    private static final class XidRWLock extends ReentrantWriterPreferenceReadWriteLock {
-
-        private Xid activeXid;
-
-        /**
-         * Check if the given Xid comes from the same globalTX
-         * @param otherXid
-         * @return true if same globalTX otherwise false
-         */
-        boolean isSameGlobalTx(Xid otherXid) {
-    	    return (activeXid == otherXid) || Arrays.equals(activeXid.getGlobalTransactionId(), otherXid.getGlobalTransactionId());
-    	}
-
-        /**
-         * Allow reader when there is no active Xid, or current Xid owns
-         * the write lock (reentrant).
-         */
-        protected boolean allowReader() {
-        	Xid currentXid = TransactionContext.getCurrentXid();
-        	return (activeXid == null && waitingWriters_ == 0) || isSameGlobalTx(currentXid);
-        }
-
-        /**
-         * {@inheritDoc}
-         */  
-        protected synchronized boolean startWrite() {
-        	Xid currentXid = TransactionContext.getCurrentXid();
-            if (activeXid != null && isSameGlobalTx(currentXid)) { // already held; re-acquire
-            	++writeHolds_;
-                return true;
-            } else if (writeHolds_ == 0) {
-            	if (activeReaders_ == 0 || (readers_.size() == 1 && readers_.get(currentXid) != null)) {
-            		activeXid = currentXid;
-            		writeHolds_ = 1;
-            		return true;
-            	} else {
-            		return false;
-            	}
-            } else {
-            	return false;
+    /**
+     * Increments the writer count, sets the writer identifier and returns
+     * the acquired read lock once there are no other active readers or
+     * writers or the current writer shares the thread id with this writer.
+     */
+    public synchronized WriteLock acquireWriteLock(ChangeLog changeLog)
+            throws InterruptedException {
+        Object currentId = getCurrentThreadId();
+
+        writersWaiting++;
+        try {
+            while (writerId != null
+                    ? !isSameThreadId(writerId, currentId) : readerCount > 0) {
+                wait();
             }
+        } finally {
+            writersWaiting--;
         }
 
-        /**
-         * {@inheritDoc}
-         */
-        protected synchronized Signaller endWrite() {
-            --writeHolds_;
-            if (writeHolds_ > 0) {  // still being held
-            	return null;
-            } else {
-            	activeXid = null;
-                if (waitingReaders_ > 0 && allowReader()) {
-                    return readerLock_;
-                } else if (waitingWriters_ > 0) {
-                    return writerLock_;
-                } else {
-                    return null;
-                }
-            }
+        if (writerCount++ == 0) {
+            writerId = currentId;
         }
+        return writeLock;
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-		protected synchronized boolean startRead() {
-			Xid currentXid = TransactionContext.getCurrentXid();
-		    Object c = readers_.get(currentXid);
-		    if (c != null) { // already held -- just increment hold count
-		    	readers_.put(currentXid, new Integer(((Integer)(c)).intValue()+1));
-		    	++activeReaders_;
-		    	return true;
-		    } else if (allowReader()) {
-		    	readers_.put(currentXid, IONE);
-		    	++activeReaders_;
-		    	return true;
-		    } else {
-		    	return false;
-		    }
-		}
-
-        /**
-         * {@inheritDoc}
-         */
-		protected synchronized Signaller endRead() {
-			Xid currentXid = TransactionContext.getCurrentXid();
-		    Object c = readers_.get(currentXid);
-		    if (c == null) {
-		    	throw new IllegalStateException();
-		    }
-		    --activeReaders_;
-		    if (c != IONE) { // more than one hold; decrement count
-		    	int h = ((Integer)(c)).intValue()-1;
-		    	Integer ih = (h == 1)? IONE : new Integer(h);
-		    	readers_.put(currentXid, ih);
-		    	return null;
-		    } else {
-		    	readers_.remove(currentXid);
-		    
-		    	if (writeHolds_ > 0) { // a write lock is still held
-		    		return null;
-		    	} else if (activeReaders_ == 0 && waitingWriters_ > 0) {
-		    		return writerLock_;
-		    	} else  {
-		    		return null;
-		    	}
-		    }
-		}
+    /**
+     * Decrements the writer count (and possibly clears the writer identifier)
+     * and notifies all pending threads if the lock is now available. If the
+     * downgrade argument is true, then the reader count is incremented before
+     * notifying any pending threads. Used by the {@link #writeLock} instance.
+     */
+    private synchronized void releaseWriteLock(boolean downgrade) {
+        writerCount--;
+        if (downgrade) {
+            readerCount++;
+        }
+        if (writerCount == 0) {
+            if (readerCount == 0) {
+                writerId = null;
+            }
+            notifyAll();
+        }
     }
+
 }

Modified: jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java?rev=1038594&r1=1038593&r2=1038594&view=diff
==============================================================================
--- jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java (original)
+++ jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java Wed Nov 24 13:10:32 2010
@@ -39,6 +39,12 @@ import org.apache.jackrabbit.core.id.Ite
  * is free to block requests entirely for additional write lock while a write
  * lock is active. It is not a requirement to support concurrent write locks.
  * </li>
+ * <li>While a write lock is held for a change log <code>C</code>, the holder
+ * of the write lock (and any related threads) needs to be able to acquire
+ * a read lock even if other writers are waiting for the lock. This behaviour
+ * must continue also when the write lock has been downgraded. Note that it
+ * is not necessary for a holder of a read lock to be able to upgrade to a
+ * write lock.</li>
  * </ul>
  */
 public interface ISMLocking {
@@ -81,10 +87,8 @@ public interface ISMLocking {
          * used to further release the read lock.
          *
          * @return the read lock downgraded from this write lock.
-         * @throws InterruptedException if the current thread is interrupted
-         *                              while downgrading the write lock.
          */
-        ReadLock downgrade() throws InterruptedException;
+        ReadLock downgrade();
 
     }
 

Modified: jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java?rev=1038594&r1=1038593&r2=1038594&view=diff
==============================================================================
--- jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java (original)
+++ jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java Wed Nov 24 13:10:32 2010
@@ -790,9 +790,6 @@ public class SharedItemStateManager
                     String path = events.getSession().getUserID() + "@" + events.getCommonPath();
                     eventChannel.updateCommitted(this, path);
                 }
-
-            } catch (InterruptedException e) {
-                throw new ItemStateException("Interrupted while downgrading to read lock");
             } finally {
                 if (writeLock != null) {
                     // exception occurred before downgrading lock
@@ -1515,9 +1512,6 @@ public class SharedItemStateManager
                 holdingWriteLock = false;
                 events.dispatch();
             }
-        } catch (InterruptedException e) {
-            String msg = "Unable to downgrade to read lock.";
-            log.error(msg);
         } finally {
             if (holdingWriteLock) {
                 if (wLock != null) {

Modified: jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/version/InternalVersionManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/version/InternalVersionManagerImpl.java?rev=1038594&r1=1038593&r2=1038594&view=diff
==============================================================================
--- jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/version/InternalVersionManagerImpl.java (original)
+++ jackrabbit/branches/2.1/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/version/InternalVersionManagerImpl.java Wed Nov 24 13:10:32 2010
@@ -30,7 +30,6 @@ import javax.jcr.version.VersionExceptio
 
 import org.apache.commons.collections.map.ReferenceMap;
 import org.apache.jackrabbit.core.SessionImpl;
-import org.apache.jackrabbit.core.RepositoryImpl;
 import org.apache.jackrabbit.core.cluster.UpdateEventChannel;
 import org.apache.jackrabbit.core.cluster.UpdateEventListener;
 import org.apache.jackrabbit.core.fs.FileSystem;
@@ -59,9 +58,9 @@ import org.apache.jackrabbit.core.value.
 import org.apache.jackrabbit.core.virtual.VirtualItemStateProvider;
 import org.apache.jackrabbit.spi.Name;
 import org.apache.jackrabbit.spi.Path;
-import org.apache.jackrabbit.spi.commons.conversion.MalformedPathException;
+import org.apache.jackrabbit.spi.PathFactory;
 import org.apache.jackrabbit.spi.commons.name.NameConstants;
-import org.apache.jackrabbit.spi.commons.name.PathBuilder;
+import org.apache.jackrabbit.spi.commons.name.PathFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,38 +80,13 @@ public class InternalVersionManagerImpl 
      */
     private static final Path SYSTEM_PATH;
 
-    /**
-     * The path to the version storage: /jcr:system/jcr:versionStorage
-     */
-    private static final Path HISTORIES_PATH;
-
-    /**
-     * The path to the version storage: /jcr:system/jcr:versionStorage/jcr:activities
-     */
-    private static final Path ACTIVITIES_PATH;
-
     static {
         try {
-            PathBuilder builder = new PathBuilder();
-            builder.addRoot();
-            builder.addLast(NameConstants.JCR_SYSTEM);
-            SYSTEM_PATH = builder.getPath();
-
-            builder = new PathBuilder();
-            builder.addRoot();
-            builder.addLast(NameConstants.JCR_SYSTEM);
-            builder.addLast(NameConstants.JCR_VERSIONSTORAGE);
-            HISTORIES_PATH = builder.getPath();
-
-            builder = new PathBuilder();
-            builder.addRoot();
-            builder.addLast(NameConstants.JCR_SYSTEM);
-            builder.addLast(NameConstants.JCR_ACTIVITIES);
-            ACTIVITIES_PATH = builder.getPath();
-
-        } catch (MalformedPathException e) {
-            // will not happen. path is always valid
-            throw new InternalError("Cannot initialize path");
+            PathFactory factory = PathFactoryImpl.getInstance();
+            SYSTEM_PATH = factory.create(
+                    factory.getRootPath(), NameConstants.JCR_SYSTEM, true);
+        } catch (RepositoryException e) {
+            throw new RuntimeException(e);
         }
     }
 
@@ -186,28 +160,6 @@ public class InternalVersionManagerImpl 
             this.fs = fs;
             this.escFactory = new DynamicESCFactory(obsMgr);
 
-            // need to recreate the jcr:system node in this pm, too. so that
-            // it can act as parent for the histories and activities.
-            if (false && !pMgr.exists(systemId)) {
-                NodeState root = pMgr.createNew(systemId);
-                root.setParentId(RepositoryImpl.ROOT_NODE_ID);
-                root.setNodeTypeName(NameConstants.REP_SYSTEM);
-                PropertyState pt = pMgr.createNew(new PropertyId(systemId, NameConstants.JCR_PRIMARYTYPE));
-                pt.setMultiValued(false);
-                pt.setType(PropertyType.NAME);
-                pt.setValues(new InternalValue[]{InternalValue.create(NameConstants.REP_SYSTEM)});
-                root.addPropertyName(pt.getName());
-
-                // add version storage and activities as child node entries
-                root.addChildNodeEntry(NameConstants.JCR_VERSIONSTORAGE, historiesId);
-                root.addChildNodeEntry(NameConstants.JCR_ACTIVITIES, activitiesId);
-
-                ChangeLog cl = new ChangeLog();
-                cl.added(root);
-                cl.added(pt);
-                pMgr.store(cl);
-            }
-
             // need to store the version storage root directly into the persistence manager
             if (!pMgr.exists(historiesId)) {
                 NodeState root = pMgr.createNew(historiesId);
@@ -711,13 +663,13 @@ public class InternalVersionManagerImpl 
         /**
          * the observation manager
          */
-        private DelegatingObservationDispatcher obsMgr;
+        private final DelegatingObservationDispatcher obsMgr;
 
         /**
-         * the current event source
+         * The event source of the current thread.
          */
-        private SessionImpl source;
-
+        private final ThreadLocal<SessionImpl> source =
+            new ThreadLocal<SessionImpl>();
 
         /**
          * Creates a new event state collection factory
@@ -735,12 +687,14 @@ public class InternalVersionManagerImpl 
          * association between update operation and session who actually invoked
          * the update, an internal event source is used.
          */
-        public synchronized EventStateCollection createEventStateCollection()
+        public EventStateCollection createEventStateCollection()
                 throws RepositoryException {
-            if (source == null) {
+            SessionImpl session = source.get();
+            if (session != null) {
+                return createEventStateCollection(session);
+            } else {
                 throw new RepositoryException("Unknown event source.");
             }
-            return createEventStateCollection(source);
         }
 
         /**
@@ -763,15 +717,16 @@ public class InternalVersionManagerImpl 
          * @return the return value of the executed runnable
          * @throws RepositoryException if an error occurs
          */
-        public synchronized Object doSourced(SessionImpl eventSource, SourcedTarget runnable)
+        public Object doSourced(SessionImpl eventSource, SourcedTarget runnable)
                 throws RepositoryException {
-            this.source = eventSource;
+            source.set(eventSource);
             try {
                 return runnable.run();
             } finally {
-                this.source = null;
+                source.remove();
             }
         }
+
     }
 
     private abstract class SourcedTarget {

Copied: jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java (from r999965, jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java)
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java?p2=jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java&p1=jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java&r1=999965&r2=1038594&rev=1038594&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java (original)
+++ jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/DefaultISMLockingDeadlockTest.java Wed Nov 24 13:10:32 2010
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.core.state
 import org.apache.jackrabbit.core.state.DefaultISMLocking;
 import org.apache.jackrabbit.core.state.ISMLocking;
 import org.apache.jackrabbit.core.state.ISMLocking.ReadLock;
+import org.apache.jackrabbit.core.state.ISMLocking.WriteLock;
 import org.apache.jackrabbit.test.JUnitTest;
 
 /**
@@ -28,7 +29,8 @@ public class DefaultISMLockingDeadlockTe
 
     public void test() throws InterruptedException {
         final ISMLocking lock = new DefaultISMLocking();
-        ReadLock r1 = lock.acquireReadLock(null);
+        WriteLock w1 = lock.acquireWriteLock(null);
+        ReadLock r1 = w1.downgrade();
         final InterruptedException[] ex = new InterruptedException[1];
         Thread thread = new Thread() {
             public void run() {

Modified: jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/TestAll.java?rev=1038594&r1=1038593&r2=1038594&view=diff
==============================================================================
--- jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/TestAll.java (original)
+++ jackrabbit/branches/2.1/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/state/TestAll.java Wed Nov 24 13:10:32 2010
@@ -37,6 +37,7 @@ public class TestAll extends TestCase {
 
         suite.addTestSuite(ChangeLogTest.class);
         suite.addTestSuite(DefaultISMLockingTest.class);
+        suite.addTestSuite(DefaultISMLockingDeadlockTest.class);
         suite.addTestSuite(FineGrainedISMLockingTest.class);
         suite.addTestSuite(NameSetTest.class);