You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by md...@apache.org on 2011/09/06 12:56:52 UTC

svn commit: r1165600 - /jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java

Author: mduerig
Date: Tue Sep  6 10:56:51 2011
New Revision: 1165600

URL: http://svn.apache.org/viewvc?rev=1165600&view=rev
Log:
Microkernel based Jackrabbit prototype (WIP)
improve background polling for observation events

Modified:
    jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java

Modified: jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java?rev=1165600&r1=1165599&r2=1165600&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java (original)
+++ jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java Tue Sep  6 10:56:51 2011
@@ -102,6 +102,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
@@ -121,7 +123,6 @@ public class WorkspaceManager implements
     private final HierarchyManager hierarchyManager;
 
     private final CacheBehaviour cacheBehaviour;
-    private final ExecutorService changePollingExecutor;
     private final int pollTimeout;
 
     private final IdFactory idFactory;
@@ -139,10 +140,10 @@ public class WorkspaceManager implements
     /**
      * This is the event polling for changes. If <code>null</code>
      * then the underlying repository service does not support observation.
-     * It is also <code>null</code> if {@link CacheBehaviour#INVALIDATE} is
+     * It is also <code>null</code> if {@link org.apache.jackrabbit.jcr2spi.config.CacheBehaviour#INVALIDATE} is
      * configured and no event listeners have been registered.
      */
-    private Future<Object> changeFeed;  // No bottom type in Java. Use object and discard return value
+    private ChangePolling changeFeed;  
 
     /**
      * List of event listener that are set on this WorkspaceManager to get
@@ -153,7 +154,7 @@ public class WorkspaceManager implements
     /**
      * The current subscription for change events if there are listeners.
      */
-    private Subscription subscription;
+    private volatile Subscription subscription;
 
     /**
      * A cache for item infos as supplied by {@link RepositoryService#getItemInfoCache(SessionInfo)}
@@ -167,11 +168,10 @@ public class WorkspaceManager implements
         this.service = service;
         this.sessionInfo = sessionInfo;
         this.cacheBehaviour = cacheBehaviour;
-        this.pollTimeout = pollTimeout;
-        changePollingExecutor = observationSupported
-            ?  service.getChangePollingExecutor()
-            : null;
-
+        this.pollTimeout = observationSupported
+            ? pollTimeout
+            : -1;
+                
         nameFactory = service.getNameFactory();
         pathFactory = service.getPathFactory();
 
@@ -341,8 +341,8 @@ public class WorkspaceManager implements
      * @throws RepositoryException if the listener cannot be registered.
      */
     public void addEventListener(InternalEventListener listener) throws RepositoryException {
-        if (changeFeed == null && changePollingExecutor != null) {
-            changeFeed = changePollingExecutor.submit(new ChangePolling(pollTimeout));
+        if (changeFeed == null && pollTimeout >= 0) {
+            changeFeed = new ChangePolling(service.getChangePollingExecutor());
         }
 
         synchronized (listeners) {
@@ -599,7 +599,7 @@ public class WorkspaceManager implements
         }
         try {
             if (changeFeed != null) {
-                changeFeed.cancel(true);
+                changeFeed.cancel();
             }
             hierarchyManager.dispose();
             if (subscription != null) {
@@ -1056,51 +1056,59 @@ public class WorkspaceManager implements
      * Implements the polling for changes on the repository service.
      */
     private final class ChangePolling implements Callable<Object> {
+        private final ExecutorService changePollingExecutor;
+        private volatile Future<Object> result;
 
-        /**
-         * The polling timeout in milliseconds.
-         */
-        private final int pollTimeout;
-
-        /**
-         * Creates a new change polling with a given polling timeout.
-         *
-         * @param pollTimeout the timeout in milliseconds.
-         */
-        private ChangePolling(int pollTimeout) {
-            this.pollTimeout = pollTimeout;
+        public ChangePolling(ExecutorService changePollingExecutor) {
+            this.changePollingExecutor = changePollingExecutor;
+            result = changePollingExecutor.submit(this);
         }
 
         @Override
         public Object call() throws Exception {
             String wspName = sessionInfo.getWorkspaceName();            
             try {
-                InternalEventListener[] iel;
+                InternalEventListener[] internalEventListeners;
                 Subscription subscription;
                 synchronized (listeners) {
-                    while (WorkspaceManager.this.subscription == null) {
+                    subscription = WorkspaceManager.this.subscription;
+                    while (subscription == null) {
                         listeners.wait();
+                        subscription = WorkspaceManager.this.subscription;
                     }
-                    iel = listeners.toArray(new InternalEventListener[listeners.size()]);
-                    subscription = WorkspaceManager.this.subscription;
+                    internalEventListeners = listeners.toArray(new InternalEventListener[listeners.size()]);
                 }
 
-                log.debug("calling getEvents() (Workspace={})", wspName);
                 EventBundle[] bundles = service.getEvents(subscription, pollTimeout);
-                log.debug("returned from getEvents() (Workspace={})", wspName);
 
                 if (!Thread.interrupted() && bundles.length > 0) {
-                    onEventReceived(bundles, iel);
+                    onEventReceived(bundles, internalEventListeners);
+                }
+
+                synchronized (this) {
+                    if (result != null) {
+                        result = changePollingExecutor.submit(this);
+                    }
                 }
-                changePollingExecutor.submit(this);
             } catch (UnsupportedRepositoryOperationException e) {
                 log.error("SPI implementation does not support observation: " + e);
             } catch (RepositoryException e) {
                 log.info("Workspace=" + wspName + ": Exception while retrieving event bundles: " + e);
-                log.debug("Dump:", e);
+                log.debug(e.getMessage(), e);
             }
             return null;
         }
 
+        public void cancel() {
+            Future<Object> result;
+            synchronized (this) {
+                result = this.result;
+                this.result = null;
+            }
+
+            if (result != null) {
+                result.cancel(true);
+            }
+        }
     }
 }