You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ju...@apache.org on 2010/09/09 15:04:17 UTC

svn commit: r995411 - /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java

Author: jukka
Date: Thu Sep  9 13:04:16 2010
New Revision: 995411

URL: http://svn.apache.org/viewvc?rev=995411&view=rev
Log:
JCR-2089: Use java.util.concurrent

Drop the use of concurrent.jar classes in DefaultISMLocking. Instead of using java.util.concurrent, I've simply implemented the required logic using just normal Java synchronization.

This commit notably streamlines the DefaultISMLocking code paths to reduce lock contention issues with heavily concurrent work loads.

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java?rev=995411&r1=995410&r2=995411&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/state/DefaultISMLocking.java Thu Sep  9 13:04:16 2010
@@ -23,203 +23,157 @@ import javax.transaction.xa.Xid;
 import org.apache.jackrabbit.core.TransactionContext;
 import org.apache.jackrabbit.core.id.ItemId;
 
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
-
 /**
- * <code>DefaultISMLocking</code> implements the default locking strategy using
- * coarse grained locking on an ItemStateManager wide read-write lock. E.g.
- * while a write lock is held, no read lock can be acquired.
+ * Default item state locking strategy. The default strategy is simply to use
+ * a single coarse-grained read-write lock over the entire workspace.
  */
 public class DefaultISMLocking implements ISMLocking {
 
     /**
-     * The internal read-write lock.
-	 * Thread concerning ReentrantWriterPreferenceReadWriteLock
+     * The read lock instance used by readers to release the acquired lock.
      */
-    private final ReadWriteLock rwLock = new ReentrantWriterPreferenceReadWriteLock();
-    
+    private final ReadLock readLock = new ReadLock() {
+        public void release() {
+            releaseReadLock();
+        }
+    };
+
     /**
-     * The internal Xid aware read-write lock.
+     * The write lock instance used by writers to release or downgrade the
+     * acquired lock.
      */
-    private final ReadWriteLock xidRwLock = new XidRWLock();
-    
+    private final WriteLock writeLock = new WriteLock() {
+        public void release() {
+            releaseWriteLock(false);
+        }
+        public ReadLock downgrade() {
+            releaseWriteLock(true);
+            return readLock;
+        }
+    };
+
     /**
-     * {@inheritDoc}
+     * Number of writer threads waiting. While greater than zero, no new
+     * readers are allowed to proceed.
      */
-    public ReadLock acquireReadLock(ItemId id) throws InterruptedException {
-    	if (TransactionContext.getCurrentXid() == null) {
-            return new ReadLockImpl(rwLock.readLock());
-    	} else {
-            return new ReadLockImpl(xidRwLock.readLock());
-    	}
-    }
+    private int writersWaiting = 0;
 
     /**
-     * {@inheritDoc}
+     * The thread identifier of the current writer, or <code>null</code> if
+     * no write is in progress. A thread with the same identifier (i.e. the
+     * same thread or another thread in the same transaction) can re-acquire
+     * read or write locks without limitation, while all other readers and
+     * writers remain blocked.
      */
-    public WriteLock acquireWriteLock(ChangeLog changeLog) throws InterruptedException {
-    	if (TransactionContext.getCurrentXid() == null) {
-    		return new WriteLockImpl(rwLock);
-    	} else {
-    		return new WriteLockImpl(xidRwLock);
-    	}
-    }
+    private Object writerId = null;
 
-    private static final class WriteLockImpl implements WriteLock {
-    	
-    	private ReadWriteLock readWriteLock;
-    	
-    	private WriteLockImpl(ReadWriteLock readWriteLock) throws InterruptedException {
-    		this.readWriteLock = readWriteLock;
-    		this.readWriteLock.writeLock().acquire();
-		}
+    /**
+     * Number of acquired write locks. All the concurrent write locks are
+     * guaranteed to share the same thread identifier (see {@link #writerId}).
+     */
+    private int writerCount = 0;
 
-		/**
-		 * {@inheritDoc}
-		 */
-		public void release() {
-		    this.readWriteLock.writeLock().release();
-		}
+    /**
+     * Number of acquired read locks.
+     */
+    private int readerCount = 0;
 
-		/**
-		 * {@inheritDoc}
-		 */
-		public ReadLock downgrade() throws InterruptedException {
-		    ReadLock rLock = new ReadLockImpl(this.readWriteLock.readLock());
-		    release();
-		    return rLock;
-		}
-	}
+    /**
+     * Increments the reader count and returns the acquired read lock once
+     * there are no more writers or the current writer shares the thread id
+     * with this reader.
+     */
+    public synchronized ReadLock acquireReadLock(ItemId id)
+            throws InterruptedException {
+        while (writerId != null
+                ? !isSameId(writerId, getCurrentId()) : writersWaiting > 0) {
+            wait();
+        }
 
-	private static final class ReadLockImpl implements ReadLock {
+        readerCount++;
+        return readLock;
+    }
 
-        private final Sync readLock;
+    /**
+     * Decrements the reader count and notifies all pending threads if the
+     * lock is now available. Used by the {@link #readLock} instance.
+     */
+    private synchronized void releaseReadLock() {
+        readerCount--;
+        if (readerCount == 0 && writerId == null) {
+            notifyAll();
+        }
+    }
 
-        private ReadLockImpl(Sync readLock) throws InterruptedException {
-            this.readLock = readLock;
-            this.readLock.acquire();
+    /**
+     * Increments the writer count, sets the writer identifier and returns
+     * the acquired read lock once there are no other active readers or
+     * writers or the current writer shares the thread id with this writer.
+     */
+    public synchronized WriteLock acquireWriteLock(ChangeLog changeLog)
+            throws InterruptedException {
+        Object currentId = getCurrentId();
+
+        writersWaiting++;
+        try {
+            while (writerId != null
+                    ? !isSameId(writerId, currentId) : readerCount > 0) {
+                wait();
+            }
+        } finally {
+            writersWaiting--;
         }
 
-        /**
-         * {@inheritDoc}
-         */
-        public void release() {
-            readLock.release();
+        if (writerCount++ == 0) {
+            writerId = currentId;
         }
+        return writeLock;
     }
 
-	/**
-	 * Xid concerning ReentrantWriterPreferenceReadWriteLock
-	 */
-    private static final class XidRWLock extends ReentrantWriterPreferenceReadWriteLock {
-
-        private Xid activeXid;
-
-        /**
-         * Check if the given Xid comes from the same globalTX
-         * @param otherXid
-         * @return true if same globalTX otherwise false
-         */
-        boolean isSameGlobalTx(Xid otherXid) {
-    	    return (activeXid == otherXid) || Arrays.equals(activeXid.getGlobalTransactionId(), otherXid.getGlobalTransactionId());
-    	}
-
-        /**
-         * Allow reader when there is no active Xid, or current Xid owns
-         * the write lock (reentrant).
-         */
-        protected boolean allowReader() {
-        	Xid currentXid = TransactionContext.getCurrentXid();
-        	return (activeXid == null && waitingWriters_ == 0) || isSameGlobalTx(currentXid);
-        }
-
-        /**
-         * {@inheritDoc}
-         */  
-        protected synchronized boolean startWrite() {
-        	Xid currentXid = TransactionContext.getCurrentXid();
-            if (activeXid != null && isSameGlobalTx(currentXid)) { // already held; re-acquire
-            	++writeHolds_;
-                return true;
-            } else if (writeHolds_ == 0) {
-            	if (activeReaders_ == 0 || (readers_.size() == 1 && readers_.get(currentXid) != null)) {
-            		activeXid = currentXid;
-            		writeHolds_ = 1;
-            		return true;
-            	} else {
-            		return false;
-            	}
-            } else {
-            	return false;
-            }
+    /**
+     * Decrements the writer count (and possibly clears the writer identifier)
+     * and notifies all pending threads if the lock is now available. If the
+     * downgrade argument is true, then the reader count is incremented before
+     * notifying any pending threads. Used by the {@link #writeLock} instance.
+     */
+    private synchronized void releaseWriteLock(boolean downgrade) {
+        writerCount--;
+        if (downgrade) {
+            readerCount++;
+        }
+        if (writerCount == 0) {
+            writerId = null;
+            notifyAll();
         }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-        protected synchronized Signaller endWrite() {
-            --writeHolds_;
-            if (writeHolds_ > 0) {  // still being held
-            	return null;
-            } else {
-            	activeXid = null;
-                if (waitingReaders_ > 0 && allowReader()) {
-                    return readerLock_;
-                } else if (waitingWriters_ > 0) {
-                    return writerLock_;
-                } else {
-                    return null;
-                }
-            }
+    /**
+     * Returns the current thread identifier. The identifier is either the
+     * current thread instance or the global transaction identifier when
+     * running under a transaction.
+     *
+     * @return current thread identifier
+     */
+    private Object getCurrentId() {
+        Xid xid = TransactionContext.getCurrentXid();
+        if (xid != null) {
+            return xid.getGlobalTransactionId();
+        } else {
+            return Thread.currentThread();
         }
+    }
 
-        /**
-         * {@inheritDoc}
-         */
-		protected synchronized boolean startRead() {
-			Xid currentXid = TransactionContext.getCurrentXid();
-		    Object c = readers_.get(currentXid);
-		    if (c != null) { // already held -- just increment hold count
-		    	readers_.put(currentXid, new Integer(((Integer)(c)).intValue()+1));
-		    	++activeReaders_;
-		    	return true;
-		    } else if (allowReader()) {
-		    	readers_.put(currentXid, IONE);
-		    	++activeReaders_;
-		    	return true;
-		    } else {
-		    	return false;
-		    }
-		}
-
-        /**
-         * {@inheritDoc}
-         */
-		protected synchronized Signaller endRead() {
-			Xid currentXid = TransactionContext.getCurrentXid();
-		    Object c = readers_.get(currentXid);
-		    if (c == null) {
-		    	throw new IllegalStateException();
-		    }
-		    --activeReaders_;
-		    if (c != IONE) { // more than one hold; decrement count
-		    	int h = ((Integer)(c)).intValue()-1;
-		    	Integer ih = (h == 1)? IONE : new Integer(h);
-		    	readers_.put(currentXid, ih);
-		    	return null;
-		    } else {
-		    	readers_.remove(currentXid);
-		    
-		    	if (writeHolds_ > 0) { // a write lock is still held
-		    		return null;
-		    	} else if (activeReaders_ == 0 && waitingWriters_ > 0) {
-		    		return writerLock_;
-		    	} else  {
-		    		return null;
-		    	}
-		    }
-		}
+    /**
+     * Compares the given thread identifiers for equality.
+     */
+    private boolean isSameId(Object a, Object b) {
+        if (a == b) {
+            return true;
+        } else if (a instanceof byte[] && b instanceof byte[]) {
+            return Arrays.equals((byte[]) a, (byte[]) b);
+        } else {
+            return false;
+        }
     }
+
 }