You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2007/02/19 11:49:40 UTC

svn commit: r509151 - in /jackrabbit/trunk/contrib/spi: client/src/test/java/org/apache/jackrabbit/jcr2spi/ jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/ jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/ jcr2spi/src/main/java/org/apach...

Author: mreutegg
Date: Mon Feb 19 02:49:39 2007
New Revision: 509151

URL: http://svn.apache.org/viewvc?view=rev&rev=509151
Log:
- Remove immediate event request after operation executed. Executing operations is not tied to observation anymore, even when CacheBehaviour = Observation.
- Event polling in spi2dav now implements timeout (blocking until event or timeout occurs)

Modified:
    jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java
    jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java
    jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
    jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java
    jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java
    jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java

Modified: jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java (original)
+++ jackrabbit/trunk/contrib/spi/client/src/test/java/org/apache/jackrabbit/jcr2spi/JCR2SPIRepositoryStub.java Mon Feb 19 02:49:39 2007
@@ -88,10 +88,6 @@
                     public CacheBehaviour getCacheBehaviour() {
                         return CacheBehaviour.INVALIDATE;
                     }
-
-                    public int getPollingInterval() {
-                        return 3 * 100000000;
-                    }
                 };
 
                 repository = RepositoryImpl.create(config);

Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceImpl.java Mon Feb 19 02:49:39 2007
@@ -100,7 +100,11 @@
     public WorkspaceImpl(String name, SessionImpl session, RepositoryConfig config, SessionInfo sessionInfo) throws RepositoryException {
         this.name = name;
         this.session = session;
-        wspManager = createManager(config.getRepositoryService(), sessionInfo, session.getCacheBehaviour(), config.getPollingInterval());
+        wspManager = createManager(
+                config.getRepositoryService(),
+                sessionInfo,
+                session.getCacheBehaviour(), 
+                3 * 1000); // 3 seconds poll timeout
     }
 
     //----------------------------------------------------------< Workspace >---
@@ -440,8 +444,8 @@
     protected WorkspaceManager createManager(RepositoryService service,
                                              SessionInfo sessionInfo,
                                              CacheBehaviour cacheBehaviour,
-                                             int pollingInterval) throws RepositoryException {
-        return new WorkspaceManager(service, sessionInfo, cacheBehaviour, pollingInterval);
+                                             int pollTimeout) throws RepositoryException {
+        return new WorkspaceManager(service, sessionInfo, cacheBehaviour, pollTimeout);
     }
 
     /**

Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/WorkspaceManager.java Mon Feb 19 02:49:39 2007
@@ -78,6 +78,8 @@
 import org.apache.jackrabbit.spi.IdIterator;
 import org.apache.jackrabbit.spi.QNodeTypeDefinition;
 import org.apache.jackrabbit.spi.QValue;
+import org.apache.jackrabbit.spi.EventIterator;
+import org.apache.jackrabbit.spi.Event;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
@@ -110,10 +112,8 @@
 import java.util.Collections;
 import java.io.InputStream;
 
-import EDU.oswego.cs.dl.util.concurrent.Channel;
 import EDU.oswego.cs.dl.util.concurrent.Sync;
-import EDU.oswego.cs.dl.util.concurrent.Latch;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
 
 /**
  * <code>WorkspaceManager</code>...
@@ -133,18 +133,11 @@
     private final NodeTypeRegistry ntRegistry;
 
     /**
-     * Monitor object to synchronize the feed thread with client
+     * Mutex to synchronize the feed thread with client
      * threads that call {@link #execute(Operation)} or {@link
      * #execute(ChangeLog)}.
      */
-    private final Object updateMonitor = new Object();
-
-    /**
-     * A producer for this channel can request an immediate poll for events
-     * by placing a Sync into the channel. The Sync is released when the event
-     * poll finished.
-     */
-    private final Channel immediateEventRequests = new LinkedQueue();
+    private final Sync updateSync = new Mutex();
 
     /**
      * This is the event polling for changes. If <code>null</code>
@@ -159,7 +152,7 @@
     private final Set listeners = Collections.synchronizedSet(new HashSet());
 
     public WorkspaceManager(RepositoryService service, SessionInfo sessionInfo,
-                            CacheBehaviour cacheBehaviour, int pollingInterval)
+                            CacheBehaviour cacheBehaviour, int pollTimeout)
         throws RepositoryException {
         this.service = service;
         this.sessionInfo = sessionInfo;
@@ -168,7 +161,7 @@
         Map repositoryDescriptors = service.getRepositoryDescriptors();
         nsRegistry = createNamespaceRegistry(repositoryDescriptors);
         ntRegistry = createNodeTypeRegistry(nsRegistry, repositoryDescriptors);
-        changeFeed = createChangeFeed(pollingInterval);
+        changeFeed = createChangeFeed(pollTimeout);
 
         TransientItemStateFactory stateFactory = createItemStateFactory(ntRegistry);
         this.isf = stateFactory;
@@ -397,17 +390,17 @@
      * Creates a background thread which polls for external changes on the
      * RepositoryService.
      *
-     * @param pollingInterval the polling interval in milliseconds.
+     * @param pollTimeout the polling timeout in milliseconds.
      * @return the background polling thread or <code>null</code> if the underlying
      *         <code>RepositoryService</code> does not support observation.
      */
-    private Thread createChangeFeed(int pollingInterval) {
+    private Thread createChangeFeed(int pollTimeout) {
         Thread t = null;
         try {
             String desc = (String) service.getRepositoryDescriptors().get(
                     Repository.OPTION_OBSERVATION_SUPPORTED);
             if ("true".equals(desc)) {
-                t = new Thread(new ChangePolling(pollingInterval));
+                t = new Thread(new ChangePolling(pollTimeout));
                 t.setName("Change Polling");
                 t.setDaemon(true);
                 t.start();
@@ -426,23 +419,20 @@
      * @see UpdatableItemStateManager#execute(Operation)
      */
     public void execute(Operation operation) throws RepositoryException {
-        if (cacheBehaviour == CacheBehaviour.OBSERVATION) {
-            Sync eventSignal;
-            synchronized (updateMonitor) {
-                new OperationVisitorImpl(sessionInfo).execute(operation);
-                eventSignal = getEventPollingRequest();
-            }
-            try {
-                eventSignal.acquire();
-            } catch (InterruptedException e) {
-                Thread.interrupted();
-                log.warn("Interrupted while waiting for events from RepositoryService");
-            }
-        } else {
+        // block event delivery while changes are executed
+        try {
+            updateSync.acquire();
+        } catch (InterruptedException e) {
+            throw new RepositoryException(e);
+        }
+        try {
             // execute operation and delegate invalidation of affected item
             // states to the operation.
             new OperationVisitorImpl(sessionInfo).execute(operation);
-            operation.persisted(cacheBehaviour);
+            // TODO: remove parameter CacheBehaviour
+            operation.persisted(CacheBehaviour.INVALIDATE);
+        } finally {
+            updateSync.release();
         }
     }
 
@@ -453,27 +443,18 @@
      * @throws RepositoryException
      */
     public void execute(ChangeLog changes) throws RepositoryException {
-        if (cacheBehaviour == CacheBehaviour.OBSERVATION) {
-            // TODO: TOBEFIXED. processing events after changelog may lead to consistency problems (duplicate processing) (e.g. removal of SNSs).
-            // TODO: filtering of events required according to information present in the changelog.
-            Sync eventSignal;
-            synchronized (updateMonitor) {
-                new OperationVisitorImpl(sessionInfo).execute(changes);
-                changes.persisted(cacheBehaviour);
-                eventSignal = getEventPollingRequest();
-            }
-            try {
-                // wait at most 10 seconds
-                if (!eventSignal.attempt(10 * 1000)) {
-                    log.warn("No events received for batch");
-                }
-            } catch (InterruptedException e) {
-                Thread.interrupted();
-                log.warn("Interrupted while waiting for events from RepositoryService");
-            }
-        } else {
+        // block event delivery while changes are executed
+        try {
+            updateSync.acquire();
+        } catch (InterruptedException e) {
+            throw new RepositoryException(e);
+        }
+        try {
             new OperationVisitorImpl(sessionInfo).execute(changes);
-            changes.persisted(cacheBehaviour);
+            // TODO: remove parameter CacheBehaviour
+            changes.persisted(CacheBehaviour.INVALIDATE);
+        } finally {
+            updateSync.release();
         }
     }
 
@@ -483,11 +464,6 @@
     public void dispose() {
         if (changeFeed != null) {
             changeFeed.interrupt();
-            try {
-                changeFeed.join();
-            } catch (InterruptedException e) {
-                log.warn("Interrupted while waiting for external change thread to terminate.");
-            }
         }
         hierarchyManager.dispose();
         try {
@@ -611,13 +587,55 @@
      * as the effect of an local or external change.
      */
     private void onEventReceived(EventBundle[] eventBundles) {
-        // notify listener
-        InternalEventListener[] lstnrs = (InternalEventListener[]) listeners.toArray(new InternalEventListener[listeners.size()]);
-        for (int i = 0; i < eventBundles.length; i++) {
-            for (int j = 0; j < lstnrs.length; j++) {
-                lstnrs[j].onEvent(eventBundles[i]);
+        if (log.isDebugEnabled()) {
+            log.debug("received {} event bundles.", new Integer(eventBundles.length));
+            for (int i = 0; i < eventBundles.length; i++) {
+                log.debug("BundleId: {}", eventBundles[i].getBundleId());
+                log.debug("IsLocal:  {}", new Boolean(eventBundles[i].isLocal()));
+                for (EventIterator it = eventBundles[i].getEvents(); it.hasNext(); ) {
+                    Event e = it.nextEvent();
+                    String type;
+                    switch (e.getType()) {
+                        case Event.NODE_ADDED:
+                            type = "NodeAdded";
+                            break;
+                        case Event.NODE_REMOVED:
+                            type = "NodeRemoved";
+                            break;
+                        case Event.PROPERTY_ADDED:
+                            type = "PropertyAdded";
+                            break;
+                        case Event.PROPERTY_CHANGED:
+                            type = "PropertyChanged";
+                            break;
+                        case Event.PROPERTY_REMOVED:
+                            type = "PropertyRemoved";
+                            break;
+                        default:
+                            type = "Unknown";
+                    }
+                    log.debug("  {}; {}", e.getQPath(), type);
+                }
             }
         }
+        // do not deliver events while an operation executes
+        try {
+            updateSync.acquire();
+        } catch (InterruptedException e) {
+            log.warn("Unable to acquire update mutext");
+            return;
+        }
+        try {
+            // notify listener
+            InternalEventListener[] lstnrs = (InternalEventListener[]) listeners.toArray(new InternalEventListener[listeners.size()]);
+            for (int i = 0; i < eventBundles.length; i++) {
+                for (int j = 0; j < lstnrs.length; j++) {
+                    lstnrs[j].onEvent(eventBundles[i]);
+                }
+            }
+        } finally {
+            updateSync.release();
+        }
     }
 
     /**
@@ -946,103 +964,56 @@
     }
 
     /**
-     * Requests an immediate poll for events. The returned Sync will be
-     * released by the event polling thread when events have been retrieved.
-     */
-    private Sync getEventPollingRequest() {
-        Sync signal;
-        if (changeFeed != null) {
-            // observation supported
-            signal = new Latch();
-            try {
-                immediateEventRequests.put(signal);
-            } catch (InterruptedException e) {
-                log.warn("Unable to request immediate event poll: " + e);
-            }
-        } else {
-            // no observation, return a dummy sync which can be acquired immediately
-            signal = new Sync() {
-                public void acquire() {
-                }
-                public boolean attempt(long l) {
-                    return true;
-                }
-                public void release() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-        return signal;
-    }
-
-    /**
      * Implements the polling for changes on the repository service.
      */
     private final class ChangePolling implements Runnable {
 
         /**
-         * The polling interval in milliseconds.
+         * The polling timeout in milliseconds.
          */
-        private final int pollingInterval;
+        private final int pollTimeout;
 
         /**
-         * Creates a new change polling with a given polling interval.
+         * Creates a new change polling with a given polling timeout.
          *
-         * @param pollingInterval the interval in milliseconds.
+         * @param pollTimeout the timeout in milliseconds.
          */
-        private ChangePolling(int pollingInterval) {
-            this.pollingInterval = pollingInterval;
+        private ChangePolling(int pollTimeout) {
+            this.pollTimeout = pollTimeout;
         }
 
         public void run() {
             while (!Thread.interrupted()) {
                 try {
-                    // wait for a signal to do an immediate poll but wait at
-                    // most EXTERNAL_EVENT_POLLING_INTERVAL
-                    Sync signal = (Sync) immediateEventRequests.poll(pollingInterval);
-
-                    synchronized (updateMonitor) {
-                        // if this thread was waiting for updateMonitor and now
-                        // enters this synchronized block, then a user thread
-                        // has just finished an operation and will probably
-                        // request an immediate event poll. That's why we
-                        // check here again for a sync signal
-                        if (signal == null) {
-                            signal = (Sync) immediateEventRequests.poll(0);
-                        }
-
-                        if (signal != null) {
-                            log.debug("Request for immediate event poll");
-                        }
-
-                        long timeout = 0;
-                        // get filters from listeners
-                        List filters = new ArrayList();
-                        InternalEventListener[] iel = (InternalEventListener[]) listeners.toArray(new InternalEventListener[0]);
-                        for (int i = 0; i < iel.length; i++) {
-                            filters.addAll(iel[i].getEventFilters());
-                        }
-                        EventFilter[] filtArr = (EventFilter[]) filters.toArray(new EventFilter[filters.size()]);
-
-                        EventBundle[] bundles = service.getEvents(sessionInfo, timeout, filtArr);
-                        try {
-                            if (bundles.length > 0) {
-                                onEventReceived(bundles);
-                            }
-                        } finally {
-                            if (signal != null) {
-                                log.debug("About to signal that events have been delivered");
-                                signal.release();
-                                log.debug("Event delivery signaled");
-                            }
-                        }
+                    // get filters from listeners
+                    List filters = new ArrayList();
+                    InternalEventListener[] iel = (InternalEventListener[]) listeners.toArray(new InternalEventListener[0]);
+                    for (int i = 0; i < iel.length; i++) {
+                        filters.addAll(iel[i].getEventFilters());
+                    }
+                    EventFilter[] filtArr = (EventFilter[]) filters.toArray(new EventFilter[filters.size()]);
+
+                    log.debug("calling getEvents() (Workspace={})",
+                            sessionInfo.getWorkspaceName());
+                    EventBundle[] bundles = service.getEvents(sessionInfo, pollTimeout, filtArr);
+                    log.debug("returned from getEvents() (Workspace={})",
+                            sessionInfo.getWorkspaceName());
+                    // check if thread had been interrupted while
+                    // getting events
+                    if (Thread.interrupted()) {
+                        log.debug("Thread interrupted, terminating...");
+                        break;
+                    }
+                    if (bundles.length > 0) {
+                        onEventReceived(bundles);
                     }
                 } catch (UnsupportedRepositoryOperationException e) {
                     log.error("SPI implementation does not support observation: " + e);
                     // terminate
                     break;
                 } catch (RepositoryException e) {
-                    log.warn("Exception while retrieving event bundles: " + e);
+                    log.info("Workspace=" + sessionInfo.getWorkspaceName() +
+                            ": Exception while retrieving event bundles: " + e);
                     log.debug("Dump:", e);
                 } catch (InterruptedException e) {
                     // terminate

Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/config/RepositoryConfig.java Mon Feb 19 02:49:39 2007
@@ -33,6 +33,4 @@
     public String getDefaultWorkspaceName();
 
     public CacheBehaviour getCacheBehaviour();
-
-    public int getPollingInterval();
 }

Modified: jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java (original)
+++ jackrabbit/trunk/contrib/spi/jcr2spi/src/main/java/org/apache/jackrabbit/jcr2spi/state/NodeState.java Mon Feb 19 02:49:39 2007
@@ -407,7 +407,7 @@
                 }
             } else {
                 // Properties: push changes down to overlayed state
-                ((PropertyState) modState.overlayedState).merge(modState, false);
+                modState.overlayedState.merge(modState, false);
                 modState.setStatus(Status.EXISTING);
 
                 // if property state defines a modified jcr:mixinTypes the parent

Modified: jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java?view=diff&rev=509151&r1=509150&r2=509151
==============================================================================
--- jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java (original)
+++ jackrabbit/trunk/contrib/spi/spi2dav/src/main/java/org/apache/jackrabbit/spi2dav/RepositoryServiceImpl.java Mon Feb 19 02:49:39 2007
@@ -23,6 +23,7 @@
 import org.apache.commons.httpclient.Header;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
 import org.apache.commons.httpclient.auth.AuthScope;
 import org.apache.commons.httpclient.methods.HeadMethod;
 import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
@@ -204,6 +205,7 @@
 
     private final HostConfiguration hostConfig;
     private final HashMap clients = new HashMap();
+    private final HttpConnectionManager connectionManager;
 
     private final Map nodeTypeDefinitions = new HashMap();
 
@@ -236,6 +238,11 @@
         } catch (URIException e) {
             throw new RepositoryException(e);
         }
+
+        this.connectionManager = new MultiThreadedHttpConnectionManager();
+        HttpConnectionManagerParams params = new HttpConnectionManagerParams();
+        params.setMaxConnectionsPerHost(hostConfig, 20);
+        this.connectionManager.setParams(params);
     }
 
     private static void checkSessionInfo(SessionInfo sessionInfo) throws RepositoryException {
@@ -279,8 +286,7 @@
     HttpClient getClient(SessionInfo sessionInfo) throws RepositoryException {
         HttpClient client = (HttpClient) clients.get(sessionInfo);
         if (client == null) {
-            HttpConnectionManager connMgr = new MultiThreadedHttpConnectionManager();
-            client = new HttpClient(connMgr);
+            client = new HttpClient(connectionManager);
             client.setHostConfiguration(hostConfig);
             // always send authentication not waiting for 401
             client.getParams().setAuthenticationPreemptive(true);
@@ -300,9 +306,6 @@
 
     private void removeClient(SessionInfo sessionInfo) {
         HttpClient cl = (HttpClient) clients.remove(sessionInfo);
-        if (cl != null) {
-            ((MultiThreadedHttpConnectionManager) cl.getHttpConnectionManager()).shutdown();
-        }
         log.debug("Removed Client " + cl + " for SessionInfo " + sessionInfo);
     }