You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by md...@apache.org on 2011/09/06 12:56:52 UTC
svn commit: r1165600 -
/jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
Author: mduerig
Date: Tue Sep 6 10:56:51 2011
New Revision: 1165600
URL: http://svn.apache.org/viewvc?rev=1165600&view=rev
Log:
Microkernel based Jackrabbit prototype (WIP)
improve background polling for observation events
Modified:
jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
Modified: jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java?rev=1165600&r1=1165599&r2=1165600&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java (original)
+++ jackrabbit/sandbox/jackrabbit-mk/jackrabbit-jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java Tue Sep 6 10:56:51 2011
@@ -102,6 +102,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -121,7 +123,6 @@ public class WorkspaceManager implements
private final HierarchyManager hierarchyManager;
private final CacheBehaviour cacheBehaviour;
- private final ExecutorService changePollingExecutor;
private final int pollTimeout;
private final IdFactory idFactory;
@@ -139,10 +140,10 @@ public class WorkspaceManager implements
/**
* This is the event polling for changes. If <code>null</code>
* then the underlying repository service does not support observation.
- * It is also <code>null</code> if {@link CacheBehaviour#INVALIDATE} is
+ * It is also <code>null</code> if {@link org.apache.jackrabbit.jcr2spi.config.CacheBehaviour#INVALIDATE} is
* configured and no event listeners have been registered.
*/
- private Future<Object> changeFeed; // No bottom type in Java. Use object and discard return value
+ private ChangePolling changeFeed;
/**
* List of event listener that are set on this WorkspaceManager to get
@@ -153,7 +154,7 @@ public class WorkspaceManager implements
/**
* The current subscription for change events if there are listeners.
*/
- private Subscription subscription;
+ private volatile Subscription subscription;
/**
* A cache for item infos as supplied by {@link RepositoryService#getItemInfoCache(SessionInfo)}
@@ -167,11 +168,10 @@ public class WorkspaceManager implements
this.service = service;
this.sessionInfo = sessionInfo;
this.cacheBehaviour = cacheBehaviour;
- this.pollTimeout = pollTimeout;
- changePollingExecutor = observationSupported
- ? service.getChangePollingExecutor()
- : null;
-
+ this.pollTimeout = observationSupported
+ ? pollTimeout
+ : -1;
+
nameFactory = service.getNameFactory();
pathFactory = service.getPathFactory();
@@ -341,8 +341,8 @@ public class WorkspaceManager implements
* @throws RepositoryException if the listener cannot be registered.
*/
public void addEventListener(InternalEventListener listener) throws RepositoryException {
- if (changeFeed == null && changePollingExecutor != null) {
- changeFeed = changePollingExecutor.submit(new ChangePolling(pollTimeout));
+ if (changeFeed == null && pollTimeout >= 0) {
+ changeFeed = new ChangePolling(service.getChangePollingExecutor());
}
synchronized (listeners) {
@@ -599,7 +599,7 @@ public class WorkspaceManager implements
}
try {
if (changeFeed != null) {
- changeFeed.cancel(true);
+ changeFeed.cancel();
}
hierarchyManager.dispose();
if (subscription != null) {
@@ -1056,51 +1056,59 @@ public class WorkspaceManager implements
* Implements the polling for changes on the repository service.
*/
private final class ChangePolling implements Callable<Object> {
+ private final ExecutorService changePollingExecutor;
+ private volatile Future<Object> result;
- /**
- * The polling timeout in milliseconds.
- */
- private final int pollTimeout;
-
- /**
- * Creates a new change polling with a given polling timeout.
- *
- * @param pollTimeout the timeout in milliseconds.
- */
- private ChangePolling(int pollTimeout) {
- this.pollTimeout = pollTimeout;
+ public ChangePolling(ExecutorService changePollingExecutor) {
+ this.changePollingExecutor = changePollingExecutor;
+ result = changePollingExecutor.submit(this);
}
@Override
public Object call() throws Exception {
String wspName = sessionInfo.getWorkspaceName();
try {
- InternalEventListener[] iel;
+ InternalEventListener[] internalEventListeners;
Subscription subscription;
synchronized (listeners) {
- while (WorkspaceManager.this.subscription == null) {
+ subscription = WorkspaceManager.this.subscription;
+ while (subscription == null) {
listeners.wait();
+ subscription = WorkspaceManager.this.subscription;
}
- iel = listeners.toArray(new InternalEventListener[listeners.size()]);
- subscription = WorkspaceManager.this.subscription;
+ internalEventListeners = listeners.toArray(new InternalEventListener[listeners.size()]);
}
- log.debug("calling getEvents() (Workspace={})", wspName);
EventBundle[] bundles = service.getEvents(subscription, pollTimeout);
- log.debug("returned from getEvents() (Workspace={})", wspName);
if (!Thread.interrupted() && bundles.length > 0) {
- onEventReceived(bundles, iel);
+ onEventReceived(bundles, internalEventListeners);
+ }
+
+ synchronized (this) {
+ if (result != null) {
+ result = changePollingExecutor.submit(this);
+ }
}
- changePollingExecutor.submit(this);
} catch (UnsupportedRepositoryOperationException e) {
log.error("SPI implementation does not support observation: " + e);
} catch (RepositoryException e) {
log.info("Workspace=" + wspName + ": Exception while retrieving event bundles: " + e);
- log.debug("Dump:", e);
+ log.debug(e.getMessage(), e);
}
return null;
}
+ public void cancel() {
+ Future<Object> result;
+ synchronized (this) {
+ result = this.result;
+ this.result = null;
+ }
+
+ if (result != null) {
+ result.cancel(true);
+ }
+ }
}
}