You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ju...@apache.org on 2010/09/24 18:45:02 UTC

svn commit: r1000950 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core: SessionImpl.java WorkspaceManager.java observation/ObservationDispatcher.java session/SessionContext.java session/SessionState.java

Author: jukka
Date: Fri Sep 24 16:45:01 2010
New Revision: 1000950

URL: http://svn.apache.org/viewvc?rev=1000950&view=rev
Log:
JCR-2746: Sleep in possibly endless loop in ObservationDispatcher

Move the sleep to the end of SessionState.perform() after all the relevant locks have been released

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/SessionImpl.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceManager.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionContext.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionState.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/SessionImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/SessionImpl.java?rev=1000950&r1=1000949&r2=1000950&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/SessionImpl.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/SessionImpl.java Fri Sep 24 16:45:01 2010
@@ -1238,14 +1238,4 @@ public class SessionImpl extends Abstrac
         }
     }
 
-    /**
-     * Delay the next operation, except if it is run using the given thread.
-     *
-     * @param exceptInThread the thread that shouldn't be delayed
-     * @param ms the delay in milliseconds
-     */
-    public void delayNextOperation(Thread exceptInThread, long ms) {
-        context.delayNextOperation(exceptInThread, ms);
-    }
-
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceManager.java?rev=1000950&r1=1000949&r2=1000950&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceManager.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceManager.java Fri Sep 24 16:45:01 2010
@@ -16,11 +16,11 @@
  */
 package org.apache.jackrabbit.core;
 
-import javax.jcr.AccessDeniedException;
 import javax.jcr.NoSuchWorkspaceException;
 import javax.jcr.RepositoryException;
 import javax.security.auth.Subject;
 
+import org.apache.jackrabbit.core.observation.ObservationDispatcher;
 import org.apache.jackrabbit.core.state.SharedItemStateManager;
 import org.xml.sax.InputSource;
 
@@ -87,6 +87,12 @@ public class WorkspaceManager {
         return repository.getWorkspaceStateManager(workspaceName);
     }
 
+    // FIXME: This is a too low-level method. Refactor...
+    public ObservationDispatcher getObservationDispatcher(String workspaceName)
+            throws NoSuchWorkspaceException, RepositoryException {
+        return repository.getObservationDispatcher(workspaceName);
+    }
+
     // FIXME: There should be a better place for this. Refactor...
     public SessionImpl createSession(Subject subject, String workspaceName)
         throws RepositoryException {

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java?rev=1000950&r1=1000949&r2=1000950&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java Fri Sep 24 16:45:01 2010
@@ -218,8 +218,21 @@ public final class ObservationDispatcher
             }
         }
         eventQueue.add(new DispatchAction(events, getAsynchronousConsumers()));
-        int size = eventQueueSize.addAndGet(events.size());
-        if (size > MAX_QUEUED_EVENTS) {
+        eventQueueSize.addAndGet(events.size());
+    }
+
+    /**
+     * Checks if the observation event queue contains more than the
+     * configured {@link #MAX_QUEUED_EVENTS maximum number of events},
+     * and delays the current thread in such cases. No delay is added
+     * if the current thread is the observation thread, for example if
+     * an observation listener writes to the repository.
+     * <p>
+     * This method should only be called outside the scope of internal
+     * repository access locks.
+     */
+    public void delayIfEventQueueOverloaded() {
+        if (eventQueueSize.get() > MAX_QUEUED_EVENTS) {
             boolean logWarning = false;
             long now = System.currentTimeMillis();
             // log a warning at most every 5 seconds (to avoid filling the log file)
@@ -236,11 +249,10 @@ public final class ObservationDispatcher
                 if (logWarning) {
                     log.warn("Waiting");
                 }
-                if (eventQueueSize.get() > MAX_QUEUED_EVENTS) {
-                    // slow down the current session
-                    // but not here, because locks are held
-                    // (that may block an observation listener, which is not what we want)
-                    events.getSession().delayNextOperation(notificationThread, 100);
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted while rate-limiting writes", e);
                 }
             }
         }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionContext.java?rev=1000950&r1=1000949&r2=1000950&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionContext.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionContext.java Fri Sep 24 16:45:01 2010
@@ -58,19 +58,6 @@ public class SessionContext implements N
     private static AtomicLong counter = new AtomicLong();
 
     /**
-     * The thread in which not to delay operations.
-     * If null, no operations are delayed.
-     * If not null, the next operation is delayed,
-     * except operations that are executed from this thread.
-     */
-    private Thread delayExceptInThread;
-
-    /**
-     * The delay in milliseconds.
-     */
-    private long delayMs;
-
-    /**
      * Creates a unique internal session name for a session with the
      * given user.
      *
@@ -374,33 +361,4 @@ public class SessionContext implements N
         return sessionName;
     }
 
-    /**
-     * Delay the next operation, except if it is run using the given thread.
-     *
-     * @param exceptInThread the thread that shouldn't be delayed
-     * @param ms the delay in milliseconds
-     */
-    public void delayNextOperation(Thread exceptInThread, long ms) {
-        this.delayExceptInThread = exceptInThread;
-        this.delayMs = ms;
-    }
-
-    /**
-     * Delay a session operation if it is necessary.
-     */
-    public void delayIfNecessary() {
-        if (delayExceptInThread != null) {
-            Thread currentThread = Thread.currentThread();
-            if (currentThread != delayExceptInThread) {
-                try {
-                    Thread.sleep(delayMs);
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-                // don't delay the next operation
-                delayExceptInThread = null;
-            }
-        }
-    }
-
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionState.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionState.java?rev=1000950&r1=1000949&r2=1000950&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionState.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/session/SessionState.java Fri Sep 24 16:45:01 2010
@@ -22,6 +22,8 @@ import java.util.concurrent.locks.Reentr
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 
+import org.apache.jackrabbit.core.WorkspaceManager;
+import org.apache.jackrabbit.core.observation.ObservationDispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -161,6 +163,7 @@ public class SessionState {
             lock.lock();
         }
 
+        boolean isOutermostWriteOperation = false;
         try {
             // Check that the session is still alive
             checkAlive();
@@ -171,7 +174,9 @@ public class SessionState {
             if (!wasWriteOperation
                     && operation instanceof SessionWriteOperation) {
                 isWriteOperation = true;
+                isOutermostWriteOperation = true;
             }
+
             try {
                 // Perform the actual operation, optionally with debug logs
                 if (log.isDebugEnabled()) {
@@ -190,7 +195,6 @@ public class SessionState {
                         }
                     }
                 } else {
-                    context.delayIfNecessary();
                     return operation.perform(context);
                 }
             } finally {
@@ -198,6 +202,18 @@ public class SessionState {
             }
         } finally {
             lock.unlock();
+
+            // Delay return from a write operation if the observation queue
+            // is being overloaded. This needs to be done after releasing
+            // the (outermost) write locks to prevent potential deadlocks.
+            // See https://issues.apache.org/jira/browse/JCR-2746
+            if (isOutermostWriteOperation) {
+                WorkspaceManager manager =
+                    context.getRepositoryContext().getWorkspaceManager();
+                ObservationDispatcher dispatcher =
+                    manager.getObservationDispatcher(context.getWorkspace().getName());
+                dispatcher.delayIfEventQueueOverloaded();
+            }
         }
     }