You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/12/11 10:45:37 UTC

svn commit: r1550090 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/ oak-jcr/src/main/java/org/apach...

Author: mduerig
Date: Wed Dec 11 09:45:37 2013
New Revision: 1550090

URL: http://svn.apache.org/r1550090
Log:
OAK-1120: Enhance observation mechanism to support Apache Sling Execute observer on a thread pool instead of creating separate threads

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Wed Dec 11 09:45:37 2013
@@ -19,15 +19,18 @@ package org.apache.jackrabbit.oak;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,6 +43,7 @@ import javax.security.auth.login.LoginEx
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.io.Closer;
 import org.apache.jackrabbit.mk.api.MicroKernel;
 import org.apache.jackrabbit.mk.core.MicroKernelImpl;
 import org.apache.jackrabbit.oak.api.ContentRepository;
@@ -52,6 +56,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
+import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
 import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
@@ -108,7 +113,9 @@ public class Oak {
 
     private SecurityProvider securityProvider;
 
-    private ScheduledExecutorService executor = defaultExecutor();
+    private ScheduledExecutorService scheduledExecutor = defaultScheduledExecutor();
+
+    private Executor executor;
 
     /**
      * Default {@code ScheduledExecutorService} used for scheduling background tasks.
@@ -116,7 +123,7 @@ public class Oak {
      * threads are pruned after one minute.
      * @return  fresh ScheduledExecutorService
      */
-    public static ScheduledExecutorService defaultExecutor() {
+    public static ScheduledExecutorService defaultScheduledExecutor() {
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(32, new ThreadFactory() {
             private final AtomicInteger counter = new AtomicInteger();
 
@@ -128,6 +135,34 @@ public class Oak {
             }
 
             private String createName() {
+                return "oak-scheduled-executor-" + counter.getAndIncrement();
+            }
+        });
+        executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+        executor.allowCoreThreadTimeOut(true);
+        return executor;
+    }
+
+    /**
+     * Default {@code ExecutorService} used for scheduling concurrent tasks.
+     * This default spawns as many threads as required with a priority of
+     * {@code Thread.MIN_PRIORITY}. Idle threads are pruned after one minute.
+     * @return  fresh ExecutorService
+     */
+    public static ExecutorService defaultExecutorService() {
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), new ThreadFactory() {
+            private final AtomicInteger counter = new AtomicInteger();
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r, createName());
+                thread.setDaemon(true);
+                thread.setPriority(Thread.MIN_PRIORITY);
+                return thread;
+            }
+
+            private String createName() {
                 return "oak-executor-" + counter.getAndIncrement();
             }
         });
@@ -160,26 +195,31 @@ public class Oak {
         @Override
         public <T> Registration register(
                 Class<T> type, T service, Map<?, ?> properties) {
-            Closeable subscription = null;
+            final Closer observerSubscription = Closer.create();
             Future<?> future = null;
-            if (executor != null && type == Runnable.class) {
+            if (scheduledExecutor != null && type == Runnable.class) {
                 Runnable runnable = (Runnable) service;
-                Long period =
-                        getValue(properties, "scheduler.period", Long.class);
+                Long period = getValue(properties, "scheduler.period", Long.class);
                 if (period != null) {
                     Boolean concurrent = getValue(
                             properties, "scheduler.concurrent",
                             Boolean.class, Boolean.FALSE);
                     if (concurrent) {
-                        future = executor.scheduleAtFixedRate(
+                        future = scheduledExecutor.scheduleAtFixedRate(
                                 runnable, period, period, TimeUnit.SECONDS);
                     } else {
-                        future = executor.scheduleWithFixedDelay(
+                        future = scheduledExecutor.scheduleWithFixedDelay(
                                 runnable, period, period, TimeUnit.SECONDS);
                     }
                 }
             } else if (type == Observer.class && store instanceof Observable) {
-                subscription = ((Observable) store).addObserver((Observer) service);
+                Executor executor = Oak.this.executor == null
+                        ? defaultExecutorService()
+                        : Oak.this.executor;
+                BackgroundObserver backgroundObserver =
+                        new BackgroundObserver((Observer) service, executor);
+                observerSubscription.register(backgroundObserver);
+                observerSubscription.register(((Observable) store).addObserver(backgroundObserver));
             }
 
             ObjectName objectName = null;
@@ -199,7 +239,6 @@ public class Oak {
 
             final Future<?> f = future;
             final ObjectName on = objectName;
-            final Closeable sub = subscription;
             return new Registration() {
                 @Override
                 public void unregister() {
@@ -213,12 +252,10 @@ public class Oak {
                             // ignore
                         }
                     }
-                    if (sub != null) {
-                        try {
-                            sub.close();
-                        } catch (IOException e) {
-                            LOG.warn("Unexpected IOException while unsubscribing observer", e);
-                        }
+                    try {
+                        observerSubscription.close();
+                    } catch (IOException e) {
+                        LOG.warn("Unexpected IOException while unsubscribing observer", e);
                     }
                 }
             };
@@ -257,7 +294,7 @@ public class Oak {
      */
     @Nonnull
     public Oak with(@Nonnull String defaultWorkspaceName) {
-        this.defaultWorkspaceName = defaultWorkspaceName;
+        this.defaultWorkspaceName = checkNotNull(defaultWorkspaceName);
         return this;
     }
 
@@ -276,7 +313,7 @@ public class Oak {
      */
     @Nonnull
     public Oak with(@Nonnull QueryIndexProvider provider) {
-        queryIndexProviders.add(provider);
+        queryIndexProviders.add(checkNotNull(provider));
         return this;
     }
 
@@ -289,7 +326,7 @@ public class Oak {
      */
     @Nonnull
     public Oak with(@Nonnull IndexEditorProvider provider) {
-        indexEditorProviders.add(provider);
+        indexEditorProviders.add(checkNotNull(provider));
         return this;
     }
 
@@ -301,6 +338,7 @@ public class Oak {
      */
     @Nonnull
     public Oak with(@Nonnull CommitHook hook) {
+        checkNotNull(hook);
         withEditorHook();
         commitHooks.add(hook);
         return this;
@@ -372,26 +410,33 @@ public class Oak {
      */
     @Nonnull
     public Oak with(@Nonnull ConflictHandler conflictHandler) {
+        checkNotNull(conflictHandler);
         withEditorHook();
         commitHooks.add(new ConflictHook(conflictHandler));
         return this;
     }
 
     @Nonnull
-    public Oak with(@Nonnull ScheduledExecutorService executorService) {
-        this.executor = executorService;
+    public Oak with(@Nonnull ScheduledExecutorService scheduledExecutor) {
+        this.scheduledExecutor = checkNotNull(scheduledExecutor);
+        return this;
+    }
+
+    @Nonnull
+    public Oak with(@Nonnull Executor executor) {
+        this.executor = checkNotNull(executor);
         return this;
     }
 
     @Nonnull
     public Oak with(@Nonnull MBeanServer mbeanServer) {
-        this.mbeanServer = mbeanServer;
+        this.mbeanServer = checkNotNull(mbeanServer);
         return this;
     }
 
     @Nonnull
     public Oak with(@Nonnull Whiteboard whiteboard) {
-        this.whiteboard = whiteboard;
+        this.whiteboard = checkNotNull(whiteboard);
         return this;
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java Wed Dec 11 09:45:37 2013
@@ -1,27 +1,35 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
+
 package org.apache.jackrabbit.oak.spi.commit;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Queues.newArrayBlockingQueue;
 
+import java.io.Closeable;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -42,94 +50,118 @@ import org.slf4j.LoggerFactory;
  * the background observer thread has yet to process are automatically merged
  * to just one change.
  */
-public class BackgroundObserver implements Observer {
-
-    private static class ContentChange {
-        private final NodeState root;
-        private final CommitInfo info;
-        ContentChange(NodeState root, CommitInfo info) {
-            this.root = root;
-            this.info = info;
-        }
-    }
-
+public class BackgroundObserver implements Observer, Closeable {
     /**
      * Signal for the background thread to stop processing changes.
      */
     private static final ContentChange STOP = new ContentChange(null, null);
 
-    private static Logger getLogger(@Nonnull Observer observer) {
-        return LoggerFactory.getLogger(checkNotNull(observer).getClass());
-    }
+    /**
+     * The receiving observer being notified off the background thread.
+     */
+    private final Observer observer;
+
+    /**
+     * Executor used to dispatch events
+     */
+    private final Executor executor;
+
+    /**
+     * Handler for uncaught exception on the background thread
+     */
+    private final UncaughtExceptionHandler exceptionHandler;
 
     /**
      * The queue of content changes to be processed.
      */
     private final BlockingQueue<ContentChange> queue;
 
+    private static class ContentChange {
+        private final NodeState root;
+        private final CommitInfo info;
+        ContentChange(NodeState root, CommitInfo info) {
+            this.root = root;
+            this.info = info;
+        }
+    }
+
     /**
      * The content change that was last added to the queue.
      * Used to compact external changes.
      */
-    private ContentChange last = null;
+    private ContentChange last;
 
     /**
      * Flag to indicate that some content changes were dropped because
      * the queue was full.
      */
-    private boolean full = false;
+    private boolean full;
 
     /**
-     * The background thread used to process changes.
+     * Current background task
      */
-    private final Thread thread;
-
-    public BackgroundObserver(
-            @Nonnull final Observer observer, int queueLength,
-            @Nonnull UncaughtExceptionHandler exceptionHandler) {
-        checkNotNull(observer);
-        checkArgument(queueLength > 0);
-        checkNotNull(exceptionHandler);
-
-        this.queue = newArrayBlockingQueue(queueLength);
+    private volatile ListenableFutureTask currentTask = ListenableFutureTask.completed();
 
-        this.thread = new Thread(new Runnable() {
+    /**
+     * Completion handler: set the current task to the next task and schedules that one
+     * on the background thread.
+     */
+    private final Runnable completionHandler = new Runnable() {
+        Callable<Void> task = new Callable<Void>() {
             @Override
-            public void run() {
+            public Void call() throws Exception {
                 try {
-                    while (true) {
-                        ContentChange change = queue.take();
-                        if (change == STOP) {
-                            return;
-                        }
+                    ContentChange change = queue.poll();
+                    while (change != null && change != STOP) {
                         observer.contentChanged(change.root, change.info);
+                        change = queue.poll();
                     }
-                } catch (InterruptedException e) {
-                    getLogger(observer).warn(
-                            "Event processing interrupted for " + observer, e);
+                } catch (Throwable t) {
+                    exceptionHandler.uncaughtException(Thread.currentThread(), t);
                 }
+                return null;
             }
-        });
-        thread.setName(observer.toString());
-        thread.setPriority(Thread.MIN_PRIORITY);
-        thread.setDaemon(true);
-        thread.setUncaughtExceptionHandler(exceptionHandler);
-        thread.start();
+        };
+
+        @Override
+        public void run() {
+            currentTask = new ListenableFutureTask(task);
+            executor.execute(currentTask);
+        }
+    };
+
+    /**
+     * {@code true} after this observer has been stopped
+     */
+    private volatile boolean stopped;
+
+    public BackgroundObserver(
+            @Nonnull Observer observer,
+            @Nonnull Executor executor,
+            int queueLength,
+            @Nonnull UncaughtExceptionHandler exceptionHandler) {
+        this.observer = checkNotNull(observer);
+        this.executor = checkNotNull(executor);
+        this.exceptionHandler = checkNotNull(exceptionHandler);
+        this.queue = newArrayBlockingQueue(queueLength);
     }
 
     public BackgroundObserver(
-            @Nonnull final Observer observer, int queueLength) {
-        this(observer, queueLength, new UncaughtExceptionHandler() {
+            @Nonnull final Observer observer,
+            @Nonnull Executor executor,
+            int queueLength) {
+        this(observer, executor, queueLength, new UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
-                getLogger(observer).error(
-                        "Uncaught exception in " + observer, e);
+                getLogger(observer).error("Uncaught exception in " + observer, e);
             }
         });
     }
 
-    public BackgroundObserver(@Nonnull Observer observer) {
-        this(observer, 1000);
+    public BackgroundObserver(
+            @Nonnull Observer observer,
+            @Nonnull Executor executor) {
+        this(observer, executor, 1000);
     }
 
     /**
@@ -139,18 +171,25 @@ public class BackgroundObserver implemen
      * middle of such a call, then that call is allowed to complete; i.e.
      * the thread is not forcibly interrupted. This method returns immediately
      * without blocking to wait for the thread to finish.
+     * <p>
+     * After a call to this method further calls to {@link #contentChanged(NodeState, CommitInfo)}
+     * will throw a {@code IllegalStateException}.
      */
-    public synchronized void stop() {
+    @Override
+    public synchronized void close() {
         queue.clear();
         queue.add(STOP);
-        // no need to join the thread; it will stop when encountering the STOP
+        stopped = true;
     }
 
     //----------------------------------------------------------< Observer >--
 
+    /**
+     * @throws IllegalStateException  if {@link #close()} has already been called.
+     */
     @Override
-    public synchronized void contentChanged(
-            @Nonnull NodeState root, @Nullable CommitInfo info) {
+    public synchronized void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+        checkState(!stopped);
         checkNotNull(root);
 
         if (info == null && last != null && last.info == null) {
@@ -181,13 +220,73 @@ public class BackgroundObserver implemen
             // compacting of external changes shown above.
             last = change;
         }
+
+        // Set the completion handler on the currently running task. Multiple calls
+        // to onComplete are not a problem here since we always pass the same value.
+        // Thus there is no question as to which of the handlers will effectively run.
+        currentTask.onComplete(completionHandler);
     }
 
-    //------------------------------------------------------------< Object >--
+    //------------------------------------------------------------< internal >---
 
-    @Override
-    public String toString() {
-        return thread.getName() + " &";
+    private static Logger getLogger(@Nonnull Observer observer) {
+        return LoggerFactory.getLogger(checkNotNull(observer).getClass());
+    }
+
+    /**
+     * A future task with a on complete handler.
+     */
+    private static class ListenableFutureTask extends FutureTask<Void> {
+        private final AtomicBoolean ran = new AtomicBoolean(false);
+
+        private volatile Runnable task;
+
+        public ListenableFutureTask(Callable<Void> callable) {
+            super(callable);
+        }
+
+        public ListenableFutureTask(Runnable task) {
+            super(task, null);
+        }
+
+        /**
+         * Set the on complete handler. The handler will run exactly once after
+         * the task terminated. If the task has already terminated at the time of
+         * this method call the handler will execute immediately.
+         * <p>
+         * Note: there is no guarantee to which handler will run when the method
+         * is called multiple times with different arguments.
+         * @param task
+         */
+        public void onComplete(Runnable task) {
+            this.task = task;
+            if (isDone()) {
+                run(task);
+            }
+        }
+
+        @Override
+        protected void done() {
+            run(task);
+        }
+
+        private void run(Runnable runnable) {
+            if (runnable != null && ran.compareAndSet(false, true)) {
+                runnable.run();
+            }
+        }
+
+        private static final Runnable NOP = new Runnable() {
+            @Override
+            public void run() {
+            }
+        };
+
+        public static ListenableFutureTask completed() {
+            ListenableFutureTask f = new ListenableFutureTask(NOP);
+            f.run();
+            return f;
+        }
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java Wed Dec 11 09:45:37 2013
@@ -37,9 +37,6 @@ import org.apache.jackrabbit.oak.spi.sta
  * notifications for all changes reported to this instance.
  */
 public class ChangeDispatcher implements Observable, Observer {
-    // TODO make the queue size configurable
-    private static final int QUEUE_SIZE = 8192;
-
     private final CompositeObserver observers = new CompositeObserver();
 
     @Nonnull
@@ -55,23 +52,23 @@ public class ChangeDispatcher implements
 
     /**
      * Register a new {@link Observer} for receiving notifications about changes reported to
-     * this change dispatcher. Changes are reported asynchronously. Clients need to
-     * call {@link java.io.Closeable#close()} close} on the returned {@code Closeable} instance
-     * to stop receiving notifications.
+     * this change dispatcher. Changes are reported synchronously and clients need to ensure
+     * to no block any length of time (e.g. by relaying through a {@link BackgroundObserver}).
+     * <p>
+     * Clients need to call {@link java.io.Closeable#close()} close} on the returned
+     * {@code Closeable} instance to stop receiving notifications.
      *
      * @return  a {@link Closeable} instance
      */
     @Override
     @Nonnull
-    public Closeable addObserver(Observer observer) {
-        final BackgroundObserver backgroundObserver = new BackgroundObserver(observer, QUEUE_SIZE);
-        backgroundObserver.contentChanged(root, null);
-        observers.addObserver(backgroundObserver);
+    public Closeable addObserver(final Observer observer) {
+        observer.contentChanged(root, null);
+        observers.addObserver(observer);
         return new Closeable() {
             @Override
             public void close() {
-                backgroundObserver.stop();
-                observers.removeObserver(backgroundObserver);
+                observers.removeObserver(observer);
             }
         };
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java Wed Dec 11 09:45:37 2013
@@ -19,14 +19,15 @@
 
 package org.apache.jackrabbit.oak.spi.commit;
 
+import static com.google.common.base.Objects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider;
 
-import static com.google.common.base.Objects.toStringHelper;
-
 /**
  * Commit info instances associate some meta data with a commit.
  */
@@ -59,11 +60,11 @@ public class CommitInfo {
     public CommitInfo(@Nonnull String sessionId, @Nullable String userId,
                       @Nonnull PermissionProvider permissionProvider,
                       @Nonnull MoveTracker moveTracker, @Nullable String message) {
-        this.sessionId = sessionId;
+        this.sessionId = checkNotNull(sessionId);
         this.userId = (userId == null) ? OAK_UNKNOWN : userId;
-        this.permissionProvider = permissionProvider;
+        this.permissionProvider = checkNotNull(permissionProvider);
+        this.moveTracker = checkNotNull(moveTracker);
         this.message = message;
-        this.moveTracker = moveTracker;
     }
 
     /**

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java?rev=1550090&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java Wed Dec 11 09:45:37 2013
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static com.google.common.collect.Iterables.concat;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.spi.security.authorization.permission.OpenPermissionProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Test;
+
+public class BackgroundObserverTest {
+    private static final CommitInfo COMMIT_INFO = new CommitInfo
+            ("no-session", null, OpenPermissionProvider.getInstance(), new MoveTracker(), null);
+
+    private final List<List<Runnable>> assertionLists = Lists.newArrayList();
+    private CountDownLatch doneCounter;
+
+    /**
+     * Assert that each observer of many running concurrently sees the same
+     * linearly sequence of commits (i.e. sees the commits in the correct order).
+     */
+    @Test
+    public void concurrentObservers() throws InterruptedException {
+        Observer observer = createCompositeObserver(newFixedThreadPool(32), 128);
+
+        for (int k = 0; k < 1024; k++) {
+            contentChanged(observer, k);
+        }
+        done(observer);
+
+        assertTrue(doneCounter.await(5, TimeUnit.SECONDS));
+
+        for (Runnable assertion : concat(assertionLists)) {
+            assertion.run();
+        }
+    }
+
+    private static void contentChanged(Observer observer, long value) {
+        NodeState node = EMPTY_NODE.builder().setProperty("p", value).getNodeState();
+        observer.contentChanged(node, COMMIT_INFO);
+    }
+
+    private static void done(Observer observer) {
+        NodeState node = EMPTY_NODE.builder().setProperty("done", true).getNodeState();
+        observer.contentChanged(node, COMMIT_INFO);
+    }
+
+    private CompositeObserver createCompositeObserver(ExecutorService executor, int count) {
+        CompositeObserver observer = new CompositeObserver();
+
+        for (int k = 0; k < count; k++) {
+            observer.addObserver(createBackgroundObserver(executor));
+        }
+        doneCounter = new CountDownLatch(count);
+        return observer;
+    }
+
+    private Observer createBackgroundObserver(ExecutorService executor) {
+        return new BackgroundObserver(new Observer() {
+            final List<Runnable> assertions = newAssertionList();
+
+            private List<Runnable> newAssertionList() {
+                ArrayList<Runnable> assertionList = Lists.newArrayList();
+                assertionLists.add(assertionList);
+                return assertionList;
+            }
+
+            NodeState previous;
+
+            @Override
+            public void contentChanged(@Nonnull final NodeState root, @Nullable CommitInfo info) {
+                if (root.hasProperty("done")) {
+                    doneCounter.countDown();
+                } else if (previous != null) {
+                    // Copy previous to avoid closing over it
+                    final NodeState p = previous;
+                    assertions.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            assertEquals(getP(p) + 1, (long) getP(root));
+                        }
+                    });
+
+                }
+                previous = root;
+            }
+
+            private Long getP(NodeState previous) {
+                return previous.getProperty("p").getValue(Type.LONG);
+            }
+        }, executor, 1024);
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java Wed Dec 11 09:45:37 2013
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.jcr;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import javax.annotation.Nonnull;
@@ -148,6 +149,12 @@ public class Jcr {
         return this;
     }
 
+    @Nonnull
+    public final Jcr with(@Nonnull Executor executor) {
+        oak.with(checkNotNull(executor));
+        return this;
+    }
+
     public Jcr withAsyncIndexing() {
         oak.withAsyncIndexing();
         return this;