You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by th...@apache.org on 2010/09/21 12:12:03 UTC

svn commit: r999298 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state: DefaultISMLocking.java ISMLocking.java SharedItemStateManager.java

Author: thomasm
Date: Tue Sep 21 10:12:02 2010
New Revision: 999298

URL: http://svn.apache.org/viewvc?rev=999298&view=rev
Log:
JCR-2753 Deadlock in DefaultISMLocking - reverting revision 995411 and 995412

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java?rev=999298&r1=999297&r2=999298&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java Tue Sep 21 10:12:02 2010
@@ -23,157 +23,203 @@ import javax.transaction.xa.Xid;
 import org.apache.jackrabbit.core.TransactionContext;
 import org.apache.jackrabbit.core.id.ItemId;
 
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
 /**
- * Default item state locking strategy. The default strategy is simply to use
- * a single coarse-grained read-write lock over the entire workspace.
+ * <code>DefaultISMLocking</code> implements the default locking strategy using
+ * coarse grained locking on an ItemStateManager wide read-write lock. E.g.
+ * while a write lock is held, no read lock can be acquired.
  */
 public class DefaultISMLocking implements ISMLocking {
 
     /**
-     * The read lock instance used by readers to release the acquired lock.
+     * The internal read-write lock.
+	 * Thread concerning ReentrantWriterPreferenceReadWriteLock
      */
-    private final ReadLock readLock = new ReadLock() {
-        public void release() {
-            releaseReadLock();
-        }
-    };
-
+    private final ReadWriteLock rwLock = new ReentrantWriterPreferenceReadWriteLock();
+    
     /**
-     * The write lock instance used by writers to release or downgrade the
-     * acquired lock.
+     * The internal Xid aware read-write lock.
      */
-    private final WriteLock writeLock = new WriteLock() {
-        public void release() {
-            releaseWriteLock(false);
-        }
-        public ReadLock downgrade() {
-            releaseWriteLock(true);
-            return readLock;
-        }
-    };
-
+    private final ReadWriteLock xidRwLock = new XidRWLock();
+    
     /**
-     * Number of writer threads waiting. While greater than zero, no new
-     * readers are allowed to proceed.
+     * {@inheritDoc}
      */
-    private int writersWaiting = 0;
+    public ReadLock acquireReadLock(ItemId id) throws InterruptedException {
+    	if (TransactionContext.getCurrentXid() == null) {
+            return new ReadLockImpl(rwLock.readLock());
+    	} else {
+            return new ReadLockImpl(xidRwLock.readLock());
+    	}
+    }
 
     /**
-     * The thread identifier of the current writer, or <code>null</code> if
-     * no write is in progress. A thread with the same identifier (i.e. the
-     * same thread or another thread in the same transaction) can re-acquire
-     * read or write locks without limitation, while all other readers and
-     * writers remain blocked.
+     * {@inheritDoc}
      */
-    private Object writerId = null;
+    public WriteLock acquireWriteLock(ChangeLog changeLog) throws InterruptedException {
+    	if (TransactionContext.getCurrentXid() == null) {
+    		return new WriteLockImpl(rwLock);
+    	} else {
+    		return new WriteLockImpl(xidRwLock);
+    	}
+    }
 
-    /**
-     * Number of acquired write locks. All the concurrent write locks are
-     * guaranteed to share the same thread identifier (see {@link #writerId}).
-     */
-    private int writerCount = 0;
+    private static final class WriteLockImpl implements WriteLock {
+    	
+    	private ReadWriteLock readWriteLock;
+    	
+    	private WriteLockImpl(ReadWriteLock readWriteLock) throws InterruptedException {
+    		this.readWriteLock = readWriteLock;
+    		this.readWriteLock.writeLock().acquire();
+		}
 
-    /**
-     * Number of acquired read locks.
-     */
-    private int readerCount = 0;
+		/**
+		 * {@inheritDoc}
+		 */
+		public void release() {
+		    this.readWriteLock.writeLock().release();
+		}
 
-    /**
-     * Increments the reader count and returns the acquired read lock once
-     * there are no more writers or the current writer shares the thread id
-     * with this reader.
-     */
-    public synchronized ReadLock acquireReadLock(ItemId id)
-            throws InterruptedException {
-        while (writerId != null
-                ? !isSameId(writerId, getCurrentId()) : writersWaiting > 0) {
-            wait();
-        }
+		/**
+		 * {@inheritDoc}
+		 */
+		public ReadLock downgrade() throws InterruptedException {
+		    ReadLock rLock = new ReadLockImpl(this.readWriteLock.readLock());
+		    release();
+		    return rLock;
+		}
+	}
 
-        readerCount++;
-        return readLock;
-    }
+	private static final class ReadLockImpl implements ReadLock {
 
-    /**
-     * Decrements the reader count and notifies all pending threads if the
-     * lock is now available. Used by the {@link #readLock} instance.
-     */
-    private synchronized void releaseReadLock() {
-        readerCount--;
-        if (readerCount == 0 && writerId == null) {
-            notifyAll();
-        }
-    }
+        private final Sync readLock;
 
-    /**
-     * Increments the writer count, sets the writer identifier and returns
-     * the acquired read lock once there are no other active readers or
-     * writers or the current writer shares the thread id with this writer.
-     */
-    public synchronized WriteLock acquireWriteLock(ChangeLog changeLog)
-            throws InterruptedException {
-        Object currentId = getCurrentId();
-
-        writersWaiting++;
-        try {
-            while (writerId != null
-                    ? !isSameId(writerId, currentId) : readerCount > 0) {
-                wait();
-            }
-        } finally {
-            writersWaiting--;
+        private ReadLockImpl(Sync readLock) throws InterruptedException {
+            this.readLock = readLock;
+            this.readLock.acquire();
         }
 
-        if (writerCount++ == 0) {
-            writerId = currentId;
+        /**
+         * {@inheritDoc}
+         */
+        public void release() {
+            readLock.release();
         }
-        return writeLock;
     }
 
-    /**
-     * Decrements the writer count (and possibly clears the writer identifier)
-     * and notifies all pending threads if the lock is now available. If the
-     * downgrade argument is true, then the reader count is incremented before
-     * notifying any pending threads. Used by the {@link #writeLock} instance.
-     */
-    private synchronized void releaseWriteLock(boolean downgrade) {
-        writerCount--;
-        if (downgrade) {
-            readerCount++;
-        }
-        if (writerCount == 0) {
-            writerId = null;
-            notifyAll();
+	/**
+	 * Xid concerning ReentrantWriterPreferenceReadWriteLock
+	 */
+    private static final class XidRWLock extends ReentrantWriterPreferenceReadWriteLock {
+
+        private Xid activeXid;
+
+        /**
+         * Check if the given Xid comes from the same globalTX
+         * @param otherXid
+         * @return true if same globalTX otherwise false
+         */
+        boolean isSameGlobalTx(Xid otherXid) {
+    	    return (activeXid == otherXid) || Arrays.equals(activeXid.getGlobalTransactionId(), otherXid.getGlobalTransactionId());
+    	}
+
+        /**
+         * Allow reader when there is no active Xid, or current Xid owns
+         * the write lock (reentrant).
+         */
+        protected boolean allowReader() {
+        	Xid currentXid = TransactionContext.getCurrentXid();
+        	return (activeXid == null && waitingWriters_ == 0) || isSameGlobalTx(currentXid);
+        }
+
+        /**
+         * {@inheritDoc}
+         */  
+        protected synchronized boolean startWrite() {
+        	Xid currentXid = TransactionContext.getCurrentXid();
+            if (activeXid != null && isSameGlobalTx(currentXid)) { // already held; re-acquire
+            	++writeHolds_;
+                return true;
+            } else if (writeHolds_ == 0) {
+            	if (activeReaders_ == 0 || (readers_.size() == 1 && readers_.get(currentXid) != null)) {
+            		activeXid = currentXid;
+            		writeHolds_ = 1;
+            		return true;
+            	} else {
+            		return false;
+            	}
+            } else {
+            	return false;
+            }
         }
-    }
 
-    /**
-     * Returns the current thread identifier. The identifier is either the
-     * current thread instance or the global transaction identifier when
-     * running under a transaction.
-     *
-     * @return current thread identifier
-     */
-    private Object getCurrentId() {
-        Xid xid = TransactionContext.getCurrentXid();
-        if (xid != null) {
-            return xid.getGlobalTransactionId();
-        } else {
-            return Thread.currentThread();
+        /**
+         * {@inheritDoc}
+         */
+        protected synchronized Signaller endWrite() {
+            --writeHolds_;
+            if (writeHolds_ > 0) {  // still being held
+            	return null;
+            } else {
+            	activeXid = null;
+                if (waitingReaders_ > 0 && allowReader()) {
+                    return readerLock_;
+                } else if (waitingWriters_ > 0) {
+                    return writerLock_;
+                } else {
+                    return null;
+                }
+            }
         }
-    }
 
-    /**
-     * Compares the given thread identifiers for equality.
-     */
-    private boolean isSameId(Object a, Object b) {
-        if (a == b) {
-            return true;
-        } else if (a instanceof byte[] && b instanceof byte[]) {
-            return Arrays.equals((byte[]) a, (byte[]) b);
-        } else {
-            return false;
-        }
+        /**
+         * {@inheritDoc}
+         */
+		protected synchronized boolean startRead() {
+			Xid currentXid = TransactionContext.getCurrentXid();
+		    Object c = readers_.get(currentXid);
+		    if (c != null) { // already held -- just increment hold count
+		    	readers_.put(currentXid, new Integer(((Integer)(c)).intValue()+1));
+		    	++activeReaders_;
+		    	return true;
+		    } else if (allowReader()) {
+		    	readers_.put(currentXid, IONE);
+		    	++activeReaders_;
+		    	return true;
+		    } else {
+		    	return false;
+		    }
+		}
+
+        /**
+         * {@inheritDoc}
+         */
+		protected synchronized Signaller endRead() {
+			Xid currentXid = TransactionContext.getCurrentXid();
+		    Object c = readers_.get(currentXid);
+		    if (c == null) {
+		    	throw new IllegalStateException();
+		    }
+		    --activeReaders_;
+		    if (c != IONE) { // more than one hold; decrement count
+		    	int h = ((Integer)(c)).intValue()-1;
+		    	Integer ih = (h == 1)? IONE : new Integer(h);
+		    	readers_.put(currentXid, ih);
+		    	return null;
+		    } else {
+		    	readers_.remove(currentXid);
+		    
+		    	if (writeHolds_ > 0) { // a write lock is still held
+		    		return null;
+		    	} else if (activeReaders_ == 0 && waitingWriters_ > 0) {
+		    		return writerLock_;
+		    	} else  {
+		    		return null;
+		    	}
+		    }
+		}
     }
-
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java?rev=999298&r1=999297&r2=999298&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/ISMLocking.java Tue Sep 21 10:12:02 2010
@@ -81,8 +81,10 @@ public interface ISMLocking {
          * used to further release the read lock.
          *
          * @return the read lock downgraded from this write lock.
+         * @throws InterruptedException if the current thread is interrupted
+         *                              while downgrading the write lock.
          */
-        ReadLock downgrade();
+        ReadLock downgrade() throws InterruptedException;
 
     }
 

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java?rev=999298&r1=999297&r2=999298&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java Tue Sep 21 10:12:02 2010
@@ -787,6 +787,9 @@ public class SharedItemStateManager
                     String path = events.getSession().getUserID() + "@" + events.getCommonPath();
                     eventChannel.updateCommitted(this, path);
                 }
+
+            } catch (InterruptedException e) {
+                throw new ItemStateException("Interrupted while downgrading to read lock");
             } finally {
                 if (writeLock != null) {
                     // exception occurred before downgrading lock
@@ -1509,6 +1512,9 @@ public class SharedItemStateManager
                 holdingWriteLock = false;
                 events.dispatch();
             }
+        } catch (InterruptedException e) {
+            String msg = "Unable to downgrade to read lock.";
+            log.error(msg);
         } finally {
             if (holdingWriteLock) {
                 if (wLock != null) {