You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2007/02/19 11:49:40 UTC
svn commit: r509151 - in /jackrabbit/trunk/contrib/spi:
client/src/test/java/org/apache/jackrabbit/jcr2spi/
jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/
jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/
jcr2spi/src/main/java/org/apach...
Author: mreutegg
Date: Mon Feb 19 02:49:39 2007
New Revision: 509151
URL: http://svn.apache.org/viewvc?view=rev&rev=509151
Log:
- Remove immediate event request after operation executed. Executing operations is not tied to observation anymore, even when CacheBehaviour = Observation.
- Event polling in spi2dav now implements timeout (blocking until event or timeout occurs)
Modified:
jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java
jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java
jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java
jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java
jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java
Modified: jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java (original)
+++ jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java Mon Feb 19 02:49:39 2007
@@ -88,10 +88,6 @@
public CacheBehaviour getCacheBehaviour() {
return CacheBehaviour.INVALIDATE;
}
-
- public int getPollingInterval() {
- return 3 * 100000000;
- }
};
repository = RepositoryImpl.create(config);
Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java Mon Feb 19 02:49:39 2007
@@ -100,7 +100,11 @@
public WorkspaceImpl(String name, SessionImpl session, RepositoryConfig config, SessionInfo sessionInfo) throws RepositoryException {
this.name = name;
this.session = session;
- wspManager = createManager(config.getRepositoryService(), sessionInfo, session.getCacheBehaviour(), config.getPollingInterval());
+ wspManager = createManager(
+ config.getRepositoryService(),
+ sessionInfo,
+ session.getCacheBehaviour(),
+ 3 * 1000); // 3 seconds poll timeout
}
//----------------------------------------------------------< Workspace >---
@@ -440,8 +444,8 @@
protected WorkspaceManager createManager(RepositoryService service,
SessionInfo sessionInfo,
CacheBehaviour cacheBehaviour,
- int pollingInterval) throws RepositoryException {
- return new WorkspaceManager(service, sessionInfo, cacheBehaviour, pollingInterval);
+ int pollTimeout) throws RepositoryException {
+ return new WorkspaceManager(service, sessionInfo, cacheBehaviour, pollTimeout);
}
/**
Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java Mon Feb 19 02:49:39 2007
@@ -78,6 +78,8 @@
import org.apache.jackrabbit.spi.IdIterator;
import org.apache.jackrabbit.spi.QNodeTypeDefinition;
import org.apache.jackrabbit.spi.QValue;
+import org.apache.jackrabbit.spi.EventIterator;
+import org.apache.jackrabbit.spi.Event;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
@@ -110,10 +112,8 @@
import java.util.Collections;
import java.io.InputStream;
-import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.Sync;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
/**
* <code>WorkspaceManager</code>...
@@ -133,18 +133,11 @@
private final NodeTypeRegistry ntRegistry;
/**
- * Monitor object to synchronize the feed thread with client
+ * Mutex to synchronize the feed thread with client
* threads that call {@link #execute(Operation)} or {@link
* #execute(ChangeLog)}.
*/
- private final Object updateMonitor = new Object();
-
- /**
- * A producer for this channel can request an immediate poll for events
- * by placing a Sync into the channel. The Sync is released when the event
- * poll finished.
- */
- private final Channel immediateEventRequests = new LinkedQueue();
+ private final Sync updateSync = new Mutex();
/**
* This is the event polling for changes. If <code>null</code>
@@ -159,7 +152,7 @@
private final Set listeners = Collections.synchronizedSet(new HashSet());
public WorkspaceManager(RepositoryService service, SessionInfo sessionInfo,
- CacheBehaviour cacheBehaviour, int pollingInterval)
+ CacheBehaviour cacheBehaviour, int pollTimeout)
throws RepositoryException {
this.service = service;
this.sessionInfo = sessionInfo;
@@ -168,7 +161,7 @@
Map repositoryDescriptors = service.getRepositoryDescriptors();
nsRegistry = createNamespaceRegistry(repositoryDescriptors);
ntRegistry = createNodeTypeRegistry(nsRegistry, repositoryDescriptors);
- changeFeed = createChangeFeed(pollingInterval);
+ changeFeed = createChangeFeed(pollTimeout);
TransientItemStateFactory stateFactory = createItemStateFactory(ntRegistry);
this.isf = stateFactory;
@@ -397,17 +390,17 @@
* Creates a background thread which polls for external changes on the
* RepositoryService.
*
- * @param pollingInterval the polling interval in milliseconds.
+ * @param pollTimeout the polling timeout in milliseconds.
* @return the background polling thread or <code>null</code> if the underlying
* <code>RepositoryService</code> does not support observation.
*/
- private Thread createChangeFeed(int pollingInterval) {
+ private Thread createChangeFeed(int pollTimeout) {
Thread t = null;
try {
String desc = (String) service.getRepositoryDescriptors().get(
Repository.OPTION_OBSERVATION_SUPPORTED);
if ("true".equals(desc)) {
- t = new Thread(new ChangePolling(pollingInterval));
+ t = new Thread(new ChangePolling(pollTimeout));
t.setName("Change Polling");
t.setDaemon(true);
t.start();
@@ -426,23 +419,20 @@
* @see UpdatableItemStateManager#execute(Operation)
*/
public void execute(Operation operation) throws RepositoryException {
- if (cacheBehaviour == CacheBehaviour.OBSERVATION) {
- Sync eventSignal;
- synchronized (updateMonitor) {
- new OperationVisitorImpl(sessionInfo).execute(operation);
- eventSignal = getEventPollingRequest();
- }
- try {
- eventSignal.acquire();
- } catch (InterruptedException e) {
- Thread.interrupted();
- log.warn("Interrupted while waiting for events from RepositoryService");
- }
- } else {
+ // block event delivery while changes are executed
+ try {
+ updateSync.acquire();
+ } catch (InterruptedException e) {
+ throw new RepositoryException(e);
+ }
+ try {
// execute operation and delegate invalidation of affected item
// states to the operation.
new OperationVisitorImpl(sessionInfo).execute(operation);
- operation.persisted(cacheBehaviour);
+ // TODO: remove parameter CacheBehaviour
+ operation.persisted(CacheBehaviour.INVALIDATE);
+ } finally {
+ updateSync.release();
}
}
@@ -453,27 +443,18 @@
* @throws RepositoryException
*/
public void execute(ChangeLog changes) throws RepositoryException {
- if (cacheBehaviour == CacheBehaviour.OBSERVATION) {
- // TODO: TOBEFIXED. processing events after changelog may lead to consistency problems (duplicate processing) (e.g. removal of SNSs).
- // TODO: filtering of events required according to information present in the changelog.
- Sync eventSignal;
- synchronized (updateMonitor) {
- new OperationVisitorImpl(sessionInfo).execute(changes);
- changes.persisted(cacheBehaviour);
- eventSignal = getEventPollingRequest();
- }
- try {
- // wait at most 10 seconds
- if (!eventSignal.attempt(10 * 1000)) {
- log.warn("No events received for batch");
- }
- } catch (InterruptedException e) {
- Thread.interrupted();
- log.warn("Interrupted while waiting for events from RepositoryService");
- }
- } else {
+ // block event delivery while changes are executed
+ try {
+ updateSync.acquire();
+ } catch (InterruptedException e) {
+ throw new RepositoryException(e);
+ }
+ try {
new OperationVisitorImpl(sessionInfo).execute(changes);
- changes.persisted(cacheBehaviour);
+ // TODO: remove parameter CacheBehaviour
+ changes.persisted(CacheBehaviour.INVALIDATE);
+ } finally {
+ updateSync.release();
}
}
@@ -483,11 +464,6 @@
public void dispose() {
if (changeFeed != null) {
changeFeed.interrupt();
- try {
- changeFeed.join();
- } catch (InterruptedException e) {
- log.warn("Interrupted while waiting for external change thread to terminate.");
- }
}
hierarchyManager.dispose();
try {
@@ -611,13 +587,55 @@
* as the effect of an local or external change.
*/
private void onEventReceived(EventBundle[] eventBundles) {
- // notify listener
- InternalEventListener[] lstnrs = (InternalEventListener[]) listeners.toArray(new InternalEventListener[listeners.size()]);
- for (int i = 0; i < eventBundles.length; i++) {
- for (int j = 0; j < lstnrs.length; j++) {
- lstnrs[j].onEvent(eventBundles[i]);
+ if (log.isDebugEnabled()) {
+ log.debug("received {} event bundles.", new Integer(eventBundles.length));
+ for (int i = 0; i < eventBundles.length; i++) {
+ log.debug("BundleId: {}", eventBundles[i].getBundleId());
+ log.debug("IsLocal: {}", new Boolean(eventBundles[i].isLocal()));
+ for (EventIterator it = eventBundles[i].getEvents(); it.hasNext(); ) {
+ Event e = it.nextEvent();
+ String type;
+ switch (e.getType()) {
+ case Event.NODE_ADDED:
+ type = "NodeAdded";
+ break;
+ case Event.NODE_REMOVED:
+ type = "NodeRemoved";
+ break;
+ case Event.PROPERTY_ADDED:
+ type = "PropertyAdded";
+ break;
+ case Event.PROPERTY_CHANGED:
+ type = "PropertyChanged";
+ break;
+ case Event.PROPERTY_REMOVED:
+ type = "PropertyRemoved";
+ break;
+ default:
+ type = "Unknown";
+ }
+ log.debug(" {}; {}", e.getQPath(), type);
+ }
}
}
+ // do not deliver events while an operation executes
+ try {
+ updateSync.acquire();
+ } catch (InterruptedException e) {
+ log.warn("Unable to acquire update mutext");
+ return;
+ }
+ try {
+ // notify listener
+ InternalEventListener[] lstnrs = (InternalEventListener[]) listeners.toArray(new InternalEventListener[listeners.size()]);
+ for (int i = 0; i < eventBundles.length; i++) {
+ for (int j = 0; j < lstnrs.length; j++) {
+ lstnrs[j].onEvent(eventBundles[i]);
+ }
+ }
+ } finally {
+ updateSync.release();
+ }
}
/**
@@ -946,103 +964,56 @@
}
/**
- * Requests an immediate poll for events. The returned Sync will be
- * released by the event polling thread when events have been retrieved.
- */
- private Sync getEventPollingRequest() {
- Sync signal;
- if (changeFeed != null) {
- // observation supported
- signal = new Latch();
- try {
- immediateEventRequests.put(signal);
- } catch (InterruptedException e) {
- log.warn("Unable to request immediate event poll: " + e);
- }
- } else {
- // no observation, return a dummy sync which can be acquired immediately
- signal = new Sync() {
- public void acquire() {
- }
- public boolean attempt(long l) {
- return true;
- }
- public void release() {
- throw new UnsupportedOperationException();
- }
- };
- }
- return signal;
- }
-
- /**
* Implements the polling for changes on the repository service.
*/
private final class ChangePolling implements Runnable {
/**
- * The polling interval in milliseconds.
+ * The polling timeout in milliseconds.
*/
- private final int pollingInterval;
+ private final int pollTimeout;
/**
- * Creates a new change polling with a given polling interval.
+ * Creates a new change polling with a given polling timeout.
*
- * @param pollingInterval the interval in milliseconds.
+ * @param pollTimeout the timeout in milliseconds.
*/
- private ChangePolling(int pollingInterval) {
- this.pollingInterval = pollingInterval;
+ private ChangePolling(int pollTimeout) {
+ this.pollTimeout = pollTimeout;
}
public void run() {
while (!Thread.interrupted()) {
try {
- // wait for a signal to do an immediate poll but wait at
- // most EXTERNAL_EVENT_POLLING_INTERVAL
- Sync signal = (Sync) immediateEventRequests.poll(pollingInterval);
-
- synchronized (updateMonitor) {
- // if this thread was waiting for updateMonitor and now
- // enters this synchronized block, then a user thread
- // has just finished an operation and will probably
- // request an immediate event poll. That's why we
- // check here again for a sync signal
- if (signal == null) {
- signal = (Sync) immediateEventRequests.poll(0);
- }
-
- if (signal != null) {
- log.debug("Request for immediate event poll");
- }
-
- long timeout = 0;
- // get filters from listeners
- List filters = new ArrayList();
- InternalEventListener[] iel = (InternalEventListener[]) listeners.toArray(new InternalEventListener[0]);
- for (int i = 0; i < iel.length; i++) {
- filters.addAll(iel[i].getEventFilters());
- }
- EventFilter[] filtArr = (EventFilter[]) filters.toArray(new EventFilter[filters.size()]);
-
- EventBundle[] bundles = service.getEvents(sessionInfo, timeout, filtArr);
- try {
- if (bundles.length > 0) {
- onEventReceived(bundles);
- }
- } finally {
- if (signal != null) {
- log.debug("About to signal that events have been delivered");
- signal.release();
- log.debug("Event delivery signaled");
- }
- }
+ // get filters from listeners
+ List filters = new ArrayList();
+ InternalEventListener[] iel = (InternalEventListener[]) listeners.toArray(new InternalEventListener[0]);
+ for (int i = 0; i < iel.length; i++) {
+ filters.addAll(iel[i].getEventFilters());
+ }
+ EventFilter[] filtArr = (EventFilter[]) filters.toArray(new EventFilter[filters.size()]);
+
+ log.debug("calling getEvents() (Workspace={})",
+ sessionInfo.getWorkspaceName());
+ EventBundle[] bundles = service.getEvents(sessionInfo, pollTimeout, filtArr);
+ log.debug("returned from getEvents() (Workspace={})",
+ sessionInfo.getWorkspaceName());
+ // check if thread had been interrupted while
+ // getting events
+ if (Thread.interrupted()) {
+ log.debug("Thread interrupted, terminating...");
+ break;
+ }
+ if (bundles.length > 0) {
+ onEventReceived(bundles);
}
} catch (UnsupportedRepositoryOperationException e) {
log.error("SPI implementation does not support observation: " + e);
// terminate
break;
} catch (RepositoryException e) {
- log.warn("Exception while retrieving event bundles: " + e);
+ log.info("Workspace=" + sessionInfo.getWorkspaceName() +
+ ": Exception while retrieving event bundles: " + e);
log.debug("Dump:", e);
} catch (InterruptedException e) {
// terminate
Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java Mon Feb 19 02:49:39 2007
@@ -33,6 +33,4 @@
public String getDefaultWorkspaceName();
public CacheBehaviour getCacheBehaviour();
-
- public int getPollingInterval();
}
Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java Mon Feb 19 02:49:39 2007
@@ -407,7 +407,7 @@
}
} else {
// Properties: push changes down to overlayed state
- ((PropertyState) modState.overlayedState).merge(modState, false);
+ modState.overlayedState.merge(modState, false);
modState.setStatus(Status.EXISTING);
// if property state defines a modified jcr:mixinTypes the parent
Modified: jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java (original)
+++ jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java Mon Feb 19 02:49:39 2007
@@ -23,6 +23,7 @@
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.HeadMethod;
import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
@@ -204,6 +205,7 @@
private final HostConfiguration hostConfig;
private final HashMap clients = new HashMap();
+ private final HttpConnectionManager connectionManager;
private final Map nodeTypeDefinitions = new HashMap();
@@ -236,6 +238,11 @@
} catch (URIException e) {
throw new RepositoryException(e);
}
+
+ this.connectionManager = new MultiThreadedHttpConnectionManager();
+ HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+ params.setMaxConnectionsPerHost(hostConfig, 20);
+ this.connectionManager.setParams(params);
}
private static void checkSessionInfo(SessionInfo sessionInfo) throws RepositoryException {
@@ -279,8 +286,7 @@
HttpClient getClient(SessionInfo sessionInfo) throws RepositoryException {
HttpClient client = (HttpClient) clients.get(sessionInfo);
if (client == null) {
- HttpConnectionManager connMgr = new MultiThreadedHttpConnectionManager();
- client = new HttpClient(connMgr);
+ client = new HttpClient(connectionManager);
client.setHostConfiguration(hostConfig);
// always send authentication not waiting for 401
client.getParams().setAuthenticationPreemptive(true);
@@ -300,9 +306,6 @@
private void removeClient(SessionInfo sessionInfo) {
HttpClient cl = (HttpClient) clients.remove(sessionInfo);
- if (cl != null) {
- ((MultiThreadedHttpConnectionManager) cl.getHttpConnectionManager()).shutdown();
- }
log.debug("Removed Client " + cl + " for SessionInfo " + sessionInfo);
}