You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/06/14 09:51:01 UTC

svn commit: r1492987 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/ oak-jcr/src/main/j...

Author: jukka
Date: Fri Jun 14 07:51:00 2013
New Revision: 1492987

URL: http://svn.apache.org/r1492987
Log:
OAK-867: Oak whiteboard

Use the whiteboard to handle scheduled tasks

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java
      - copied, changed from r1492831, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/DefaultWhiteboard.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/OsgiWhiteboard.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/Activator.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java
    jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/Activator.java
    jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/SlingRepositoryImpl.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Fri Jun 14 07:51:00 2013
@@ -22,6 +22,8 @@ import static java.util.concurrent.Execu
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -31,6 +33,7 @@ import javax.security.auth.login.LoginEx
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.core.MicroKernelImpl;
 import org.apache.jackrabbit.oak.api.ContentRepository;
@@ -62,6 +65,9 @@ import org.apache.jackrabbit.oak.spi.sec
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 
 /**
  * Builder class for constructing {@link ContentRepository} instances with
@@ -96,6 +102,58 @@ public class Oak {
 
     private String defaultWorkspaceName = DEFAULT_WORKSPACE_NAME;
 
+    @SuppressWarnings("unchecked")
+    private static <T> T getValue(
+            Map<?, ?> properties, String name, Class<T> type, T def) {
+        Object value = properties.get(name);
+        if (type.isInstance(value)) {
+            return (T) value;
+        } else {
+            return def;
+        }
+    }
+
+    private static <T> T getValue(
+            Map<?, ?> properties, String name, Class<T> type) {
+        return getValue(properties, name, type, null);
+    }
+
+    private Whiteboard whiteboard = new Whiteboard() {
+        @Override
+        public <T> Registration register(
+                Class<T> type, T service, Map<?, ?> properties) {
+            Future<?> future = null;
+            if (type == Runnable.class) {
+                Runnable runnable = (Runnable) service;
+                Long period =
+                        getValue(properties, "scheduler.period", Long.class);
+                if (period != null) {
+
+                    Boolean concurrent = getValue(
+                            properties, "scheduler.concurrent",
+                            Boolean.class, Boolean.FALSE);
+                    if (concurrent) {
+                        future = executor.scheduleAtFixedRate(
+                                runnable, period, period, TimeUnit.SECONDS);
+                    } else {
+                        future = executor.scheduleWithFixedDelay(
+                                runnable, period, period, TimeUnit.SECONDS);
+                    }
+                }
+            }
+
+            final Future<?> f = future;
+            return new Registration() {
+                @Override
+                public void unregister() {
+                    if (f != null) {
+                        f.cancel(false);
+                    }
+                }
+            };
+        }
+    };
+
     /**
      * Flag controlling the asynchronous indexing behavior. If false (default)
      * there will be no background indexing happening.
@@ -249,6 +307,12 @@ public class Oak {
         return this;
     }
 
+    @Nonnull
+    public Oak with(@Nonnull Whiteboard whiteboard) {
+        this.whiteboard = whiteboard;
+        return this;
+    }
+
     /**
      * Enable the asynchronous (background) indexing behavior.
      * 
@@ -263,8 +327,8 @@ public class Oak {
     }
 
     @Nonnull
-    public ScheduledExecutorService getExecutorService() {
-        return this.executor;
+    public Whiteboard getWhiteboard() {
+        return this.whiteboard;
     }
 
     public ContentRepository createContentRepository() {
@@ -278,9 +342,8 @@ public class Oak {
                 .compose(editorProviders)));
 
         if (asyncIndexing) {
-            executor.scheduleWithFixedDelay(
-                    new AsyncIndexUpdate("async", store, indexEditors),
-                    1, 5, TimeUnit.SECONDS);
+            Runnable task = new AsyncIndexUpdate("async", store, indexEditors);
+            WhiteboardUtils.scheduleWithFixedDelay(whiteboard, task, 5);
         }
 
         // FIXME: OAK-810 move to proper workspace initialization

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Fri Jun 14 07:51:00 2013
@@ -18,12 +18,11 @@
  */
 package org.apache.jackrabbit.oak.plugins.observation;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nonnull;
@@ -45,6 +44,9 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
 import org.apache.jackrabbit.oak.spi.state.VisibleDiff;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
@@ -62,9 +64,8 @@ class ChangeProcessor implements Runnabl
 
     private final Exception initStacktrace;
 
-    private volatile boolean running;
     private volatile boolean stopping;
-    private ScheduledFuture<?> future;
+    private Registration registration;
     private Listener changeListener;
 
     private boolean userInfoAccessedWithoutExternalsCheck;
@@ -90,16 +91,16 @@ class ChangeProcessor implements Runnabl
 
     /**
      * Start the change processor on the passed {@code executor}.
-     * @param executor
+     * @param whiteboard
      * @throws IllegalStateException if started already
      */
-    public synchronized void start(ScheduledExecutorService executor) {
-        if (future != null) {
-            throw new IllegalStateException("Change processor started already");
-        }
+    public synchronized void start(Whiteboard whiteboard) {
+        checkState(registration == null, "Change processor started already");
+
         stopping = false;
         changeListener = observationManager.newChangeListener();
-        future = executor.scheduleWithFixedDelay(this, 100, 1000, TimeUnit.MILLISECONDS);
+        registration =
+                WhiteboardUtils.scheduleWithFixedDelay(whiteboard, this, 1);
     }
 
 
@@ -108,29 +109,17 @@ class ChangeProcessor implements Runnabl
      * events will be delivered.
      * @throws IllegalStateException if not yet started or stopped already
      */
-    public synchronized void stop() {
-        if (future == null) {
-            throw new IllegalStateException("Change processor not started");
-        }
-
-        try {
-            stopping = true;
-            future.cancel(true);
-            while (running) {
-                wait();
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        finally {
+    public void stop() {
+        stopping = true; // do this outside synchronization
+        synchronized (this) {
+            checkState(registration != null, "Change processor not started");
             changeListener.dispose();
-            future = null;
+            registration.unregister();
         }
     }
 
     @Override
-    public void run() {
-        running = true;
+    public synchronized void run() {
         try{
             ChangeSet changes = changeListener.getChanges();
             if (changes != null &&
@@ -143,11 +132,6 @@ class ChangeProcessor implements Runnabl
             }
         } catch (Exception e) {
             log.error("Unable to generate or send events", e);
-        } finally {
-            synchronized (this) {
-                running = false;
-                notifyAll();
-            }
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java Fri Jun 14 07:51:00 2013
@@ -20,7 +20,6 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jcr.RepositoryException;
@@ -36,6 +35,7 @@ import org.apache.jackrabbit.oak.api.Con
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
 import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
 import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
@@ -50,17 +50,17 @@ public class ObservationManagerImpl impl
     private final ContentSession contentSession;
     private final ReadOnlyNodeTypeManager ntMgr;
     private final NamePathMapper namePathMapper;
-    private final ScheduledExecutorService executor;
+    private final Whiteboard whiteboard;
 
     public ObservationManagerImpl(ContentSession contentSession, ReadOnlyNodeTypeManager nodeTypeManager,
-            NamePathMapper namePathMapper, ScheduledExecutorService executor) {
+            NamePathMapper namePathMapper, Whiteboard whiteboard) {
 
         Preconditions.checkArgument(contentSession instanceof Observable);
 
         this.contentSession = contentSession;
         this.ntMgr = nodeTypeManager;
         this.namePathMapper = namePathMapper;
-        this.executor = executor;
+        this.whiteboard = whiteboard;
     }
 
     public synchronized void dispose() {
@@ -89,7 +89,7 @@ public class ObservationManagerImpl impl
             log.error(OBSERVATION, "Registering event listener {} with filter {}", listener, filter);
             processor = new ChangeProcessor(this, listener, filter);
             processors.put(listener, processor);
-            processor.start(executor);
+            processor.start(whiteboard);
         } else {
             log.debug(OBSERVATION, "Changing event listener {} to filter {}", listener, filter);
             processor.setFilter(filter);
@@ -97,11 +97,13 @@ public class ObservationManagerImpl impl
     }
 
     @Override
-    public synchronized void removeEventListener(EventListener listener) {
-        ChangeProcessor processor = processors.remove(listener);
-
+    public void removeEventListener(EventListener listener) {
+        ChangeProcessor processor;
+        synchronized (this) {
+            processor = processors.remove(listener);
+        }
         if (processor != null) {
-            processor.stop();
+            processor.stop(); // needs to happen outside synchronization
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/DefaultWhiteboard.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/DefaultWhiteboard.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/DefaultWhiteboard.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/DefaultWhiteboard.java Fri Jun 14 07:51:00 2013
@@ -16,14 +16,13 @@
  */
 package org.apache.jackrabbit.oak.spi.whiteboard;
 
-import java.util.Dictionary;
+import java.util.Map;
 
 public class DefaultWhiteboard implements Whiteboard {
 
     @Override
     public <T> Registration register(
-            final Class<T> type, final T service,
-            final Dictionary<?, ?> properties) {
+            final Class<T> type, final T service, final Map<?, ?> properties) {
         registered(type, service, properties);
         return new Registration() {
             @Override
@@ -36,11 +35,11 @@ public class DefaultWhiteboard implement
     //---------------------------------------------------------< protected >--
 
     protected void registered(
-            Class<?> type, Object service, Dictionary<?, ?> properties) {
+            Class<?> type, Object service, Map<?, ?> properties) {
     }
 
     protected void unregistered(
-            Class<?> type, Object service, Dictionary<?, ?> properties) {
+            Class<?> type, Object service, Map<?, ?> properties) {
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/OsgiWhiteboard.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/OsgiWhiteboard.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/OsgiWhiteboard.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/OsgiWhiteboard.java Fri Jun 14 07:51:00 2013
@@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.spi.wh
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Map;
 
 import javax.annotation.Nonnull;
 
@@ -38,9 +40,14 @@ public class OsgiWhiteboard implements W
 
     @Override
     public <T> Registration register(
-            Class<T> type, T service, Dictionary<?, ?> properties) {
+            Class<T> type, T service, Map<?, ?> properties) {
+        Dictionary<Object, Object> dictionary = new Hashtable<Object, Object>();
+        for (Map.Entry<?, ?> entry : properties.entrySet()) {
+            dictionary.put(entry.getKey(), entry.getValue());
+        }
+
         final ServiceRegistration registration =
-                context.registerService(type.getName(), service, properties);
+                context.registerService(type.getName(), service, dictionary);
         return new Registration() {
             @Override
             public void unregister() {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java Fri Jun 14 07:51:00 2013
@@ -16,7 +16,7 @@
  */
 package org.apache.jackrabbit.oak.spi.whiteboard;
 
-import java.util.Dictionary;
+import java.util.Map;
 
 public interface Whiteboard {
 
@@ -29,7 +29,6 @@ public interface Whiteboard {
      * @param properties service properties
      * @return service registration
      */
-    <T> Registration register(
-            Class<T> type, T service, Dictionary<?, ?> properties);
+    <T> Registration register(Class<T> type, T service, Map<?, ?> properties);
 
 }

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java (from r1492831, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java&r1=1492831&r2=1492987&rev=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/Whiteboard.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/WhiteboardUtils.java Fri Jun 14 07:51:00 2013
@@ -16,20 +16,17 @@
  */
 package org.apache.jackrabbit.oak.spi.whiteboard;
 
-import java.util.Dictionary;
+import com.google.common.collect.ImmutableMap;
 
-public interface Whiteboard {
+public class WhiteboardUtils {
 
-    /**
-     * Publishes the given service to the whiteboard. Use the returned
-     * registration object to unregister the service.
-     *
-     * @param type type of the service
-     * @param service service instance
-     * @param properties service properties
-     * @return service registration
-     */
-    <T> Registration register(
-            Class<T> type, T service, Dictionary<?, ?> properties);
+    public static Registration scheduleWithFixedDelay(
+            Whiteboard whiteboard, Runnable runnable, long delay) {
+        return whiteboard.register(
+                Runnable.class, runnable, ImmutableMap.builder()
+                    .put("scheduler.period", delay)
+                    .put("scheduler.concurrent", false)
+                    .build());
+    }
 
 }

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java Fri Jun 14 07:51:00 2013
@@ -146,7 +146,7 @@ public class Jcr {
     public Repository createRepository() {
         return new RepositoryImpl(
                 oak.createContentRepository(), 
-                oak.getExecutorService(), 
+                oak.getWhiteboard(),
                 securityProvider);
     }
 

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java Fri Jun 14 07:51:00 2013
@@ -18,8 +18,6 @@ package org.apache.jackrabbit.oak.jcr;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.util.concurrent.ScheduledExecutorService;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.jcr.Credentials;
@@ -34,6 +32,7 @@ import org.apache.jackrabbit.oak.api.Con
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,14 +48,14 @@ public class RepositoryImpl implements R
 
     private final Descriptors descriptors = new Descriptors(new SimpleValueFactory());
     private final ContentRepository contentRepository;
-    private final ScheduledExecutorService executor;
+    private final Whiteboard whiteboard;
     private final SecurityProvider securityProvider;
 
     public RepositoryImpl(@Nonnull ContentRepository contentRepository,
-                          @Nonnull ScheduledExecutorService executor,
+                          @Nonnull Whiteboard whiteboard,
                           @Nonnull SecurityProvider securityProvider) {
         this.contentRepository = checkNotNull(contentRepository);
-        this.executor = checkNotNull(executor);
+        this.whiteboard = checkNotNull(whiteboard);
         this.securityProvider = checkNotNull(securityProvider);
     }
 
@@ -139,7 +138,7 @@ public class RepositoryImpl implements R
                 }
             };
 
-            context[0] = SessionContext.create(sessionDelegate, this);
+            context[0] = new SessionContext(this, whiteboard, sessionDelegate);
             return context[0].getSession();
         } catch (LoginException e) {
             throw new javax.jcr.LoginException(e.getMessage(), e);
@@ -190,10 +189,6 @@ public class RepositoryImpl implements R
         return securityProvider;
     }
 
-    ScheduledExecutorService getObservationExecutor() {
-        return executor;
-    }
-
     ContentRepository getContentRepository() {
         return contentRepository;
     }

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java Fri Jun 14 07:51:00 2013
@@ -66,6 +66,7 @@ import org.apache.jackrabbit.oak.spi.sec
 import org.apache.jackrabbit.oak.spi.security.principal.PrincipalConfiguration;
 import org.apache.jackrabbit.oak.spi.security.privilege.PrivilegeConfiguration;
 import org.apache.jackrabbit.oak.spi.security.user.UserConfiguration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.xml.ProtectedItemImporter;
 
 /**
@@ -74,12 +75,15 @@ import org.apache.jackrabbit.oak.spi.xml
  * the session scoped instances generally needed (e.g. {@code NamePathMapper},
  * {@code ValueFactory}, etc.).
  */
-public abstract class SessionContext implements NamePathMapper {
+public class SessionContext implements NamePathMapper {
     private final RepositoryImpl repository;
+    private final Whiteboard whiteboard;
     private final SessionDelegate delegate;
     private final SessionNamespaces namespaces;
     private final NamePathMapper namePathMapper;
     private final ValueFactory valueFactory;
+    private final SessionImpl session;
+    private final WorkspaceImpl workspace;
 
     private AccessControlManager accessControlManager;
     private PermissionProvider permissionProvider;
@@ -88,10 +92,12 @@ public abstract class SessionContext imp
     private PrivilegeManager privilegeManager;
     private ObservationManager observationManager;
 
-    private SessionContext(RepositoryImpl repository,
-                           final SessionDelegate delegate) {
-        this.delegate = delegate;
-        this.repository = repository;
+    SessionContext(
+            RepositoryImpl repository, Whiteboard whiteboard,
+            final SessionDelegate delegate) {
+        this.repository = checkNotNull(repository);
+        this.whiteboard = checkNotNull(whiteboard);
+        this.delegate = checkNotNull(delegate);
         this.namespaces = new SessionNamespaces(this);
         LocalNameMapper nameMapper = new LocalNameMapper() {
             @Override
@@ -108,48 +114,37 @@ public abstract class SessionContext imp
                 nameMapper, delegate.getIdManager());
         this.valueFactory = new ValueFactoryImpl(
                 delegate.getRoot().getBlobFactory(), namePathMapper);
-    }
 
-    public static SessionContext create(final SessionDelegate delegate, RepositoryImpl repository) {
-        return new SessionContext(checkNotNull(repository), checkNotNull(delegate)) {
-            private final SessionImpl session = new SessionImpl(this);
-            private final WorkspaceImpl workspace = new WorkspaceImpl(this);
+        this.session = new SessionImpl(this);
+        this.workspace = new WorkspaceImpl(this);
+    }
 
-            @Override
-            public Session getSession() {
-                return session;
-            }
+    public Session getSession() {
+        return session;
+    }
 
-            @Override
-            public Workspace getWorkspace() {
-                return workspace;
-            }
+    public Workspace getWorkspace() {
+        return workspace;
+    }
 
-            @Override
-            public LockManager getLockManager() {
-                return workspace.getLockManager();
-            }
+    public LockManager getLockManager() {
+        return workspace.getLockManager();
+    }
 
-            @Override
-            public NodeTypeManager getNodeTypeManager() {
-                return workspace.getNodeTypeManager();
-            }
+    public NodeTypeManager getNodeTypeManager() {
+        return workspace.getNodeTypeManager();
+    }
 
-            @Override
-            public VersionManager getVersionManager() throws RepositoryException {
-                return workspace.getVersionManager();
-            }
+    public VersionManager getVersionManager() throws RepositoryException {
+        return workspace.getVersionManager();
+    }
 
-            @Override
-            public EffectiveNodeTypeProvider getEffectiveNodeTypeProvider() {
-                return workspace.getReadWriteNodeTypeManager();
-            }
+    public EffectiveNodeTypeProvider getEffectiveNodeTypeProvider() {
+        return workspace.getReadWriteNodeTypeManager();
+    }
 
-            @Override
-            public DefinitionProvider getDefinitionProvider() {
-                return workspace.getReadWriteNodeTypeManager();
-            }
-        };
+    public DefinitionProvider getDefinitionProvider() {
+        return workspace.getReadWriteNodeTypeManager();
     }
 
     public Repository getRepository() {
@@ -164,20 +159,6 @@ public abstract class SessionContext imp
         return namespaces;
     }
 
-    public abstract Session getSession();
-
-    public abstract Workspace getWorkspace();
-
-    public abstract LockManager getLockManager();
-
-    public abstract NodeTypeManager getNodeTypeManager();
-
-    public abstract VersionManager getVersionManager() throws RepositoryException;
-
-    public abstract EffectiveNodeTypeProvider getEffectiveNodeTypeProvider();
-
-    public abstract DefinitionProvider getDefinitionProvider();
-
     public NodeImpl createNodeOrNull(NodeDelegate nd) throws RepositoryException {
         if (nd == null) {
             return null;
@@ -255,7 +236,7 @@ public abstract class SessionContext imp
             observationManager = new ObservationManagerImpl(
                 contentSession,
                 ReadOnlyNodeTypeManager.getInstance(delegate.getRoot(), namePathMapper),
-                namePathMapper, repository.getObservationExecutor());
+                namePathMapper, whiteboard);
         }
         return observationManager;
     }

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/Activator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/Activator.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/Activator.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/Activator.java Fri Jun 14 07:51:00 2013
@@ -19,14 +19,14 @@ package org.apache.jackrabbit.oak.jcr.os
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import javax.jcr.Repository;
 
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
@@ -38,7 +38,7 @@ public class Activator implements Bundle
 
     private BundleContext context;
 
-    private ScheduledExecutorService executor;
+    private Whiteboard whiteboard;
 
     private SecurityProvider securityProvider = new OpenSecurityProvider(); // TODO review
 
@@ -52,7 +52,7 @@ public class Activator implements Bundle
     @Override
     public void start(BundleContext bundleContext) throws Exception {
         context = bundleContext;
-        executor = Executors.newScheduledThreadPool(1);
+        whiteboard = new OsgiWhiteboard(context);
         tracker = new ServiceTracker(
                 context, ContentRepository.class.getName(), this);
         tracker.open();
@@ -61,7 +61,6 @@ public class Activator implements Bundle
     @Override
     public void stop(BundleContext bundleContext) throws Exception {
         tracker.close();
-        executor.shutdown();
     }
 
     //--------------------------------------------< ServiceTrackerCustomizer >--
@@ -73,7 +72,7 @@ public class Activator implements Bundle
             ContentRepository repository = (ContentRepository) service;
             services.put(reference, context.registerService(
                     Repository.class.getName(),
-                    new OsgiRepository(repository, executor, securityProvider),
+                    new OsgiRepository(repository, whiteboard, securityProvider),
                     new Properties()));
             return service;
         } else {

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java Fri Jun 14 07:51:00 2013
@@ -16,8 +16,6 @@
  */
 package org.apache.jackrabbit.oak.jcr.osgi;
 
-import java.util.concurrent.ScheduledExecutorService;
-
 import javax.jcr.Credentials;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
@@ -26,6 +24,7 @@ import org.apache.jackrabbit.oak.Oak;
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.jcr.RepositoryImpl;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 
 /**
  * Workaround to a JAAS class loading issue in OSGi environments.
@@ -35,9 +34,9 @@ import org.apache.jackrabbit.oak.spi.sec
 public class OsgiRepository extends RepositoryImpl {
 
     public OsgiRepository(ContentRepository repository,
-                          ScheduledExecutorService executor,
+                          Whiteboard whiteboard,
                           SecurityProvider securityProvider) {
-        super(repository, executor, securityProvider);
+        super(repository, whiteboard, securityProvider);
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/Activator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/Activator.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/Activator.java (original)
+++ jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/Activator.java Fri Jun 14 07:51:00 2013
@@ -19,14 +19,13 @@ package org.apache.jackrabbit.oak.sling;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 import javax.jcr.Repository;
 
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.security.SecurityProviderImpl;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.OsgiWhiteboard;
 import org.apache.sling.jcr.api.SlingRepository;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
@@ -39,8 +38,6 @@ public class Activator implements Bundle
 
     private BundleContext context;
 
-    private ScheduledExecutorService executor;
-
     private SecurityProvider securityProvider;
 
     private ServiceTracker tracker;
@@ -56,7 +53,6 @@ public class Activator implements Bundle
     @Override
     public void start(BundleContext bundleContext) throws Exception {
         context = bundleContext;
-        executor = Executors.newScheduledThreadPool(1);
         securityProvider = new SecurityProviderImpl(); // TODO
         tracker = new ServiceTracker(
                 context, ContentRepository.class.getName(), this);
@@ -66,7 +62,6 @@ public class Activator implements Bundle
     @Override
     public void stop(BundleContext bundleContext) throws Exception {
         tracker.close();
-        executor.shutdown();
     }
 
     //--------------------------------------------< ServiceTrackerCustomizer >--
@@ -76,7 +71,9 @@ public class Activator implements Bundle
         Object service = context.getService(reference);
         if (service instanceof ContentRepository) {
             SlingRepository repository = new SlingRepositoryImpl(
-                    (ContentRepository) service, executor, securityProvider);
+                    (ContentRepository) service,
+                    new OsgiWhiteboard(context),
+                    securityProvider);
             jcrRepositories.put(reference, context.registerService(
                     Repository.class.getName(),
                     repository, new Properties()));

Modified: jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/SlingRepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/SlingRepositoryImpl.java?rev=1492987&r1=1492986&r2=1492987&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/SlingRepositoryImpl.java (original)
+++ jackrabbit/oak/trunk/oak-sling/src/main/java/org/apache/jackrabbit/oak/sling/SlingRepositoryImpl.java Fri Jun 14 07:51:00 2013
@@ -16,23 +16,22 @@
  */
 package org.apache.jackrabbit.oak.sling;
 
-import java.util.concurrent.ScheduledExecutorService;
-
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.jcr.osgi.OsgiRepository;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.sling.jcr.api.SlingRepository;
 
 public class SlingRepositoryImpl
         extends OsgiRepository implements SlingRepository {
 
     public SlingRepositoryImpl(ContentRepository repository,
-                               ScheduledExecutorService executor,
+                               Whiteboard whiteboard,
                                SecurityProvider securityProvider) {
-        super(repository, executor, securityProvider);
+        super(repository, whiteboard, securityProvider);
     }
 
     @Override



Re: svn commit: r1492987 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/ oak-jcr/src/main/j...

Posted by Jukka Zitting <ju...@gmail.com>.
Hi,

On Fri, Jun 14, 2013 at 12:51 PM, Michael Dürig <md...@apache.org> wrote:
> Sounds good. I try to come up with something and change the expectation for
> the RepositoryTest.observationDispose test case.

OK, great!

BR,

Jukka Zitting

Re: svn commit: r1492987 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/ oak-jcr/src/main/j...

Posted by Michael Dürig <md...@apache.org>.

On 14.6.13 10:45, Jukka Zitting wrote:
> Hi,
>
> On Fri, Jun 14, 2013 at 12:41 PM, Michael Dürig <md...@apache.org> wrote:
>> AFAIU to comply with the removeEventListener contract we either have to
>> interrupt the observation thread (like my implementation did) or live with
>> the deadlock caused by client handlers blocking on events (like you
>> experienced with RepositoryTest.observationDispose).
>
> I would rather do the latter. Interrupting threads is nasty business,

Yes and clients usually just swallow the InterruptedException instead of 
properly handling it and setting the thread's interrupted status... 
which might lead to deadlocks as well.

> and the potential blocking would only affect the client that's in
> charge of the troublesome listener.

Sounds good. I try to come up with something and change the expectation 
for the RepositoryTest.observationDispose test case.

Michael

Re: svn commit: r1492987 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/ oak-jcr/src/main/j...

Posted by Jukka Zitting <ju...@gmail.com>.
Hi,

On Fri, Jun 14, 2013 at 12:41 PM, Michael Dürig <md...@apache.org> wrote:
> AFAIU to comply with the removeEventListener contract we either have to
> interrupt the observation thread (like my implementation did) or live with
> the deadlock caused by client handlers blocking on events (like you
> experienced with RepositoryTest.observationDispose).

I would rather do the latter. Interrupting threads is nasty business,
and the potential blocking would only affect the client that's in
charge of the troublesome listener.

BR,

Jukka Zitting

Re: svn commit: r1492987 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/ oak-jcr/src/main/j...

Posted by Michael Dürig <md...@apache.org>.


AFAIU to comply with the removeEventListener contract we either have to 
interrupt the observation thread (like my implementation did) or live 
with the deadlock caused by client handlers blocking on events (like you 
experienced with RepositoryTest.observationDispose).

Michael

On 14.6.13 10:23, Michael Dürig wrote:
>
> Hi,
>
> Below code breaks stop()'s contract, which states that no further events
> will be delivered after calling stop. It will be that way probably most
> of the time but not necessarily always.
>
> Changing stop()'s contract escalates to
> ObservationManagerImpl.removeEventListener, which explicitly states "The
> deregistration method will block until the listener has completed
> executing." It also escalates to Session.logout leading to the (rare but
> confusing) possibility of events being still delivered to "logged out"
> listeners.
>
> Michael
>
>
> On 14.6.13 8:51, jukka@apache.org wrote:
>> @@ -108,29 +109,17 @@ class ChangeProcessor implements Runnabl
>>        * events will be delivered.
>>        * @throws IllegalStateException if not yet started or stopped
>> already
>>        */
>> -    public synchronized void stop() {
>> -        if (future == null) {
>> -            throw new IllegalStateException("Change processor not
>> started");
>> -        }
>> -
>> -        try {
>> -            stopping = true;
>> -            future.cancel(true);
>> -            while (running) {
>> -                wait();
>> -            }
>> -        } catch (InterruptedException e) {
>> -            Thread.currentThread().interrupt();
>> -        }
>> -        finally {
>> +    public void stop() {
>> +        stopping = true; // do this outside synchronization
>> +        synchronized (this) {
>> +            checkState(registration != null, "Change processor not
>> started");
>>               changeListener.dispose();
>> -            future = null;
>> +            registration.unregister();
>>           }
>>       }

Re: svn commit: r1492987 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/whiteboard/ oak-jcr/src/main/j...

Posted by Michael Dürig <md...@apache.org>.
Hi,

Below code breaks stop()'s contract, which states that no further events 
will be delivered after calling stop. It will be that way probably most 
of the time but not necessarily always.

Changing stop()'s contract escalates to 
ObservationManagerImpl.removeEventListener, which explicitly states "The 
deregistration method will block until the listener has completed 
executing." It also escalates to Session.logout leading to the (rare but 
confusing) possibility of events being still delivered to "logged out" 
listeners.

Michael


On 14.6.13 8:51, jukka@apache.org wrote:
> @@ -108,29 +109,17 @@ class ChangeProcessor implements Runnabl
>        * events will be delivered.
>        * @throws IllegalStateException if not yet started or stopped already
>        */
> -    public synchronized void stop() {
> -        if (future == null) {
> -            throw new IllegalStateException("Change processor not started");
> -        }
> -
> -        try {
> -            stopping = true;
> -            future.cancel(true);
> -            while (running) {
> -                wait();
> -            }
> -        } catch (InterruptedException e) {
> -            Thread.currentThread().interrupt();
> -        }
> -        finally {
> +    public void stop() {
> +        stopping = true; // do this outside synchronization
> +        synchronized (this) {
> +            checkState(registration != null, "Change processor not started");
>               changeListener.dispose();
> -            future = null;
> +            registration.unregister();
>           }
>       }