You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/07/07 14:48:40 UTC
svn commit: r1608436 - in /jackrabbit/oak/branches/1.0: ./
oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Author: mduerig
Date: Mon Jul 7 12:48:40 2014
New Revision: 1608436
URL: http://svn.apache.org/r1608436
Log:
OAK-1703: Improve warning logged on concurrent Session access
merged revision 1595457
Modified:
jackrabbit/oak/branches/1.0/ (props changed)
jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
Merged /jackrabbit/oak/trunk:r1595457
Modified: jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1608436&r1=1608435&r2=1608436&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java (original)
+++ jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java Mon Jul 7 12:48:40 2014
@@ -19,7 +19,9 @@ package org.apache.jackrabbit.oak.jcr.de
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.CheckForNull;
@@ -127,7 +129,7 @@ public class SessionDelegate {
* synchronization in order to be able to log attempts to concurrently
* use a session.
*/
- private final Lock lock = new ReentrantLock();
+ private final WarningLock lock = new WarningLock(new ReentrantLock());
/**
* Create a new session delegate for a {@code ContentSession}. The refresh behaviour of the
@@ -242,7 +244,7 @@ public class SessionDelegate {
* @return synchronized iterator
*/
public <T> Iterator<T> sync(Iterator<T> iterator) {
- return new SynchronizedIterator<T>(iterator);
+ return new SynchronizedIterator<T>(iterator, lock);
}
/**
@@ -264,25 +266,7 @@ public class SessionDelegate {
// Acquire the exclusive lock for accessing session internals.
// No other session should be holding the lock, so we log a
// message to let the user know of such cases.
- if (!lock.tryLock()) {
- if (sessionOperation.isUpdate()) {
- Exception trace = new Exception(
- "Stack trace of concurrent access to " + contentSession);
- log.warn("Attempt to perform " + sessionOperation + " while another thread is " +
- "concurrently writing to " + contentSession + ". Blocking until the " +
- "other thread is finished using this session. Please review your code " +
- "to avoid concurrent use of a session.", trace);
- } else if (log.isDebugEnabled()) {
- Exception trace = new Exception(
- "Stack trace of concurrent access to " + contentSession);
- log.warn("Attempt to perform " + sessionOperation + " while another thread is " +
- "concurrently reading from " + contentSession + ". Blocking until the " +
- "other thread is finished using this session. Please review your code " +
- "to avoid concurrent use of a session.", trace);
- }
- lock.lock();
- }
-
+ lock.lock(sessionOperation);
try {
if (sessionOpCount == 0) {
// Refresh and precondition checks only for non re-entrant
@@ -683,21 +667,21 @@ public class SessionDelegate {
/**
* This iterator delegates to a backing iterator and synchronises
- * all calls to the backing iterator on this {@code SessionDelegate}
- * instance.
- *
+ * all calls wrt. the lock passed to its constructor.
* @param <T>
*/
- private final class SynchronizedIterator<T> implements Iterator<T> {
+ private static final class SynchronizedIterator<T> implements Iterator<T> {
private final Iterator<T> iterator;
+ private final WarningLock lock;
- SynchronizedIterator(Iterator<T> iterator) {
+ SynchronizedIterator(Iterator<T> iterator, WarningLock lock) {
this.iterator = iterator;
+ this.lock = lock;
}
@Override
public boolean hasNext() {
- lock.lock();
+ lock.lock(false, "hasNext()");
try {
return iterator.hasNext();
} finally {
@@ -707,7 +691,7 @@ public class SessionDelegate {
@Override
public T next() {
- lock.lock();
+ lock.lock(false, "next()");
try {
return iterator.next();
} finally {
@@ -717,7 +701,7 @@ public class SessionDelegate {
@Override
public void remove() {
- lock.lock();
+ lock.lock(true, "remove()");
try {
iterator.remove();
} finally {
@@ -726,4 +710,97 @@ public class SessionDelegate {
}
}
+ /**
+ * A {@link Lock} implementation that has additional methods
+ * for acquiring the lock, which log a warning if the lock is
+ * already held by another thread and was also acquired through
+ * such a method.
+ */
+ private static final class WarningLock implements Lock {
+ private final Lock lock;
+
+ // All access to members only *after* the lock has been acquired
+ private boolean isUpdate;
+ private Exception holderTrace;
+ private String holderThread;
+
+ private WarningLock(Lock lock) {
+ this.lock = lock;
+ }
+
+ public void lock(boolean isUpdate, String opName) {
+ if (!lock.tryLock()) {
+ // Acquire the lock before logging the warnings. As otherwise race conditions
+ // on the involved fields might lead to wrong warnings.
+ lock.lock();
+ if (holderThread != null) {
+ if (this.isUpdate) {
+ log.warn("Attempted to perform " + opName + " while thread " + holderThread +
+ " was concurrently writing to this session. Blocked until the " +
+ "other thread finished using this session. Please review your code " +
+ "to avoid concurrent use of a session.", holderTrace);
+ } else if (log.isDebugEnabled()) {
+ log.debug("Attempted to perform " + opName + " while thread " + holderThread +
+ " was concurrently reading from this session. Blocked until the " +
+ "other thread finished using this session. Please review your code " +
+ "to avoid concurrent use of a session.", holderTrace);
+ }
+ }
+ }
+ this.isUpdate = isUpdate;
+ holderTrace = new Exception("Stack trace of concurrent access to session");
+ holderThread = Thread.currentThread().getName();
+ }
+
+ public void lock(SessionOperation<?> sessionOperation) {
+ lock(sessionOperation.isUpdate(), sessionOperation.toString());
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ holderTrace = null;
+ holderThread = null;
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock.lockInterruptibly();
+ holderTrace = null;
+ holderThread = null;
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (lock.tryLock()) {
+ holderTrace = null;
+ holderThread = null;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ if (lock.tryLock(time, unit)) {
+ holderTrace = null;
+ holderThread = null;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void unlock() {
+ lock.unlock();
+ }
+
+ @Override
+ public Condition newCondition() {
+ return lock.newCondition();
+ }
+
+ }
}