You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/07/07 14:48:40 UTC

svn commit: r1608436 - in /jackrabbit/oak/branches/1.0: ./ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java

Author: mduerig
Date: Mon Jul  7 12:48:40 2014
New Revision: 1608436

URL: http://svn.apache.org/r1608436
Log:
OAK-1703: Improve warning logged on concurrent Session access
merged revision 1595457

Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk:r1595457

Modified: jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1608436&r1=1608435&r2=1608436&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java (original)
+++ jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java Mon Jul  7 12:48:40 2014
@@ -19,7 +19,9 @@ package org.apache.jackrabbit.oak.jcr.de
 import java.io.IOException;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.CheckForNull;
@@ -127,7 +129,7 @@ public class SessionDelegate {
      * synchronization in order to be able to log attempts to concurrently
      * use a session.
      */
-    private final Lock lock = new ReentrantLock();
+    private final WarningLock lock = new WarningLock(new ReentrantLock());
 
     /**
      * Create a new session delegate for a {@code ContentSession}. The refresh behaviour of the
@@ -242,7 +244,7 @@ public class SessionDelegate {
      * @return  synchronized iterator
      */
     public <T> Iterator<T> sync(Iterator<T> iterator) {
-        return new SynchronizedIterator<T>(iterator);
+        return new SynchronizedIterator<T>(iterator, lock);
     }
 
     /**
@@ -264,25 +266,7 @@ public class SessionDelegate {
         // Acquire the exclusive lock for accessing session internals.
         // No other session should be holding the lock, so we log a
         // message to let the user know of such cases.
-        if (!lock.tryLock()) {
-            if (sessionOperation.isUpdate()) {
-                Exception trace = new Exception(
-                        "Stack trace of concurrent access to " + contentSession);
-                log.warn("Attempt to perform " + sessionOperation + " while another thread is " +
-                        "concurrently writing to " + contentSession + ". Blocking until the " +
-                        "other thread is finished using this session. Please review your code " +
-                        "to avoid concurrent use of a session.", trace);
-            } else if (log.isDebugEnabled()) {
-                Exception trace = new Exception(
-                        "Stack trace of concurrent access to " + contentSession);
-                log.warn("Attempt to perform " + sessionOperation + " while another thread is " +
-                        "concurrently reading from " + contentSession + ". Blocking until the " +
-                        "other thread is finished using this session. Please review your code " +
-                        "to avoid concurrent use of a session.", trace);
-            }
-            lock.lock();
-        }
-
+        lock.lock(sessionOperation);
         try {
             if (sessionOpCount == 0) {
                 // Refresh and precondition checks only for non re-entrant
@@ -683,21 +667,21 @@ public class SessionDelegate {
 
     /**
      * This iterator delegates to a backing iterator and synchronises
-     * all calls to the backing iterator on this {@code SessionDelegate}
-     * instance.
-     *
+     * all calls wrt. the lock passed to its constructor.
      * @param <T>
      */
-    private final class SynchronizedIterator<T> implements Iterator<T> {
+    private static final class SynchronizedIterator<T> implements Iterator<T> {
         private final Iterator<T> iterator;
+        private final WarningLock lock;
 
-        SynchronizedIterator(Iterator<T> iterator) {
+        SynchronizedIterator(Iterator<T> iterator, WarningLock lock) {
             this.iterator = iterator;
+            this.lock = lock;
         }
 
         @Override
         public boolean hasNext() {
-            lock.lock();
+            lock.lock(false, "hasNext()");
             try {
                 return iterator.hasNext();
             } finally {
@@ -707,7 +691,7 @@ public class SessionDelegate {
 
         @Override
         public T next() {
-            lock.lock();
+            lock.lock(false, "next()");
             try {
                 return iterator.next();
             } finally {
@@ -717,7 +701,7 @@ public class SessionDelegate {
 
         @Override
         public void remove() {
-            lock.lock();
+            lock.lock(true, "remove()");
             try {
                 iterator.remove();
             } finally {
@@ -726,4 +710,97 @@ public class SessionDelegate {
         }
     }
 
+    /**
+     * A {@link Lock} implementation that has additional methods
+     * for acquiring the lock, which log a warning if the lock is
+     * already held by another thread and was also acquired through
+     * such a method.
+     */
+    private static final class WarningLock implements Lock {
+        private final Lock lock;
+
+        // All access to members only *after* the lock has been acquired
+        private boolean isUpdate;
+        private Exception holderTrace;
+        private String holderThread;
+
+        private WarningLock(Lock lock) {
+            this.lock = lock;
+        }
+
+        public void lock(boolean isUpdate, String opName) {
+            if (!lock.tryLock()) {
+                // Acquire the lock before logging the warnings. As otherwise race conditions
+                // on the involved fields might lead to wrong warnings.
+                lock.lock();
+                if (holderThread != null) {
+                    if (this.isUpdate) {
+                        log.warn("Attempted to perform " + opName + " while thread " + holderThread +
+                                " was concurrently writing to this session. Blocked until the " +
+                                "other thread finished using this session. Please review your code " +
+                                "to avoid concurrent use of a session.", holderTrace);
+                    } else if (log.isDebugEnabled()) {
+                        log.debug("Attempted to perform " + opName + " while thread " + holderThread +
+                                " was concurrently reading from this session. Blocked until the " +
+                                "other thread finished using this session. Please review your code " +
+                                "to avoid concurrent use of a session.", holderTrace);
+                    }
+                }
+            }
+            this.isUpdate = isUpdate;
+            holderTrace = new Exception("Stack trace of concurrent access to session");
+            holderThread = Thread.currentThread().getName();
+        }
+
+        public void lock(SessionOperation<?> sessionOperation) {
+            lock(sessionOperation.isUpdate(), sessionOperation.toString());
+        }
+
+        @Override
+        public void lock() {
+            lock.lock();
+            holderTrace = null;
+            holderThread = null;
+        }
+
+        @Override
+        public void lockInterruptibly() throws InterruptedException {
+            lock.lockInterruptibly();
+            holderTrace = null;
+            holderThread = null;
+        }
+
+        @Override
+        public boolean tryLock() {
+            if (lock.tryLock()) {
+                holderTrace = null;
+                holderThread = null;
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+            if (lock.tryLock(time, unit)) {
+                holderTrace = null;
+                holderThread = null;
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public void unlock() {
+            lock.unlock();
+        }
+
+        @Override
+        public Condition newCondition() {
+            return lock.newCondition();
+        }
+
+    }
 }