You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/12/11 10:45:37 UTC
svn commit: r1550090 - in /jackrabbit/oak/trunk:
oak-core/src/main/java/org/apache/jackrabbit/oak/
oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/
oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/
oak-jcr/src/main/java/org/apach...
Author: mduerig
Date: Wed Dec 11 09:45:37 2013
New Revision: 1550090
URL: http://svn.apache.org/r1550090
Log:
OAK-1120: Enhance observation mechanism to support Apache Sling Execute observer on a thread pool instead of creating separate threads
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Wed Dec 11 09:45:37 2013
@@ -19,15 +19,18 @@ package org.apache.jackrabbit.oak;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
-import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,6 +43,7 @@ import javax.security.auth.login.LoginEx
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import com.google.common.io.Closer;
import org.apache.jackrabbit.mk.api.MicroKernel;
import org.apache.jackrabbit.mk.core.MicroKernelImpl;
import org.apache.jackrabbit.oak.api.ContentRepository;
@@ -52,6 +56,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
+import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
@@ -108,7 +113,9 @@ public class Oak {
private SecurityProvider securityProvider;
- private ScheduledExecutorService executor = defaultExecutor();
+ private ScheduledExecutorService scheduledExecutor = defaultScheduledExecutor();
+
+ private Executor executor;
/**
* Default {@code ScheduledExecutorService} used for scheduling background tasks.
@@ -116,7 +123,7 @@ public class Oak {
* threads are pruned after one minute.
* @return fresh ScheduledExecutorService
*/
- public static ScheduledExecutorService defaultExecutor() {
+ public static ScheduledExecutorService defaultScheduledExecutor() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(32, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@@ -128,6 +135,34 @@ public class Oak {
}
private String createName() {
+ return "oak-scheduled-executor-" + counter.getAndIncrement();
+ }
+ });
+ executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
+ /**
+ * Default {@code ExecutorService} used for scheduling concurrent tasks.
+ * This default spawns as many threads as required with a priority of
+ * {@code Thread.MIN_PRIORITY}. Idle threads are pruned after one minute.
+ * @return fresh ExecutorService
+ */
+ public static ExecutorService defaultExecutorService() {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ private final AtomicInteger counter = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, createName());
+ thread.setDaemon(true);
+ thread.setPriority(Thread.MIN_PRIORITY);
+ return thread;
+ }
+
+ private String createName() {
return "oak-executor-" + counter.getAndIncrement();
}
});
@@ -160,26 +195,31 @@ public class Oak {
@Override
public <T> Registration register(
Class<T> type, T service, Map<?, ?> properties) {
- Closeable subscription = null;
+ final Closer observerSubscription = Closer.create();
Future<?> future = null;
- if (executor != null && type == Runnable.class) {
+ if (scheduledExecutor != null && type == Runnable.class) {
Runnable runnable = (Runnable) service;
- Long period =
- getValue(properties, "scheduler.period", Long.class);
+ Long period = getValue(properties, "scheduler.period", Long.class);
if (period != null) {
Boolean concurrent = getValue(
properties, "scheduler.concurrent",
Boolean.class, Boolean.FALSE);
if (concurrent) {
- future = executor.scheduleAtFixedRate(
+ future = scheduledExecutor.scheduleAtFixedRate(
runnable, period, period, TimeUnit.SECONDS);
} else {
- future = executor.scheduleWithFixedDelay(
+ future = scheduledExecutor.scheduleWithFixedDelay(
runnable, period, period, TimeUnit.SECONDS);
}
}
} else if (type == Observer.class && store instanceof Observable) {
- subscription = ((Observable) store).addObserver((Observer) service);
+ Executor executor = Oak.this.executor == null
+ ? defaultExecutorService()
+ : Oak.this.executor;
+ BackgroundObserver backgroundObserver =
+ new BackgroundObserver((Observer) service, executor);
+ observerSubscription.register(backgroundObserver);
+ observerSubscription.register(((Observable) store).addObserver(backgroundObserver));
}
ObjectName objectName = null;
@@ -199,7 +239,6 @@ public class Oak {
final Future<?> f = future;
final ObjectName on = objectName;
- final Closeable sub = subscription;
return new Registration() {
@Override
public void unregister() {
@@ -213,12 +252,10 @@ public class Oak {
// ignore
}
}
- if (sub != null) {
- try {
- sub.close();
- } catch (IOException e) {
- LOG.warn("Unexpected IOException while unsubscribing observer", e);
- }
+ try {
+ observerSubscription.close();
+ } catch (IOException e) {
+ LOG.warn("Unexpected IOException while unsubscribing observer", e);
}
}
};
@@ -257,7 +294,7 @@ public class Oak {
*/
@Nonnull
public Oak with(@Nonnull String defaultWorkspaceName) {
- this.defaultWorkspaceName = defaultWorkspaceName;
+ this.defaultWorkspaceName = checkNotNull(defaultWorkspaceName);
return this;
}
@@ -276,7 +313,7 @@ public class Oak {
*/
@Nonnull
public Oak with(@Nonnull QueryIndexProvider provider) {
- queryIndexProviders.add(provider);
+ queryIndexProviders.add(checkNotNull(provider));
return this;
}
@@ -289,7 +326,7 @@ public class Oak {
*/
@Nonnull
public Oak with(@Nonnull IndexEditorProvider provider) {
- indexEditorProviders.add(provider);
+ indexEditorProviders.add(checkNotNull(provider));
return this;
}
@@ -301,6 +338,7 @@ public class Oak {
*/
@Nonnull
public Oak with(@Nonnull CommitHook hook) {
+ checkNotNull(hook);
withEditorHook();
commitHooks.add(hook);
return this;
@@ -372,26 +410,33 @@ public class Oak {
*/
@Nonnull
public Oak with(@Nonnull ConflictHandler conflictHandler) {
+ checkNotNull(conflictHandler);
withEditorHook();
commitHooks.add(new ConflictHook(conflictHandler));
return this;
}
@Nonnull
- public Oak with(@Nonnull ScheduledExecutorService executorService) {
- this.executor = executorService;
+ public Oak with(@Nonnull ScheduledExecutorService scheduledExecutor) {
+ this.scheduledExecutor = checkNotNull(scheduledExecutor);
+ return this;
+ }
+
+ @Nonnull
+ public Oak with(@Nonnull Executor executor) {
+ this.executor = checkNotNull(executor);
return this;
}
@Nonnull
public Oak with(@Nonnull MBeanServer mbeanServer) {
- this.mbeanServer = mbeanServer;
+ this.mbeanServer = checkNotNull(mbeanServer);
return this;
}
@Nonnull
public Oak with(@Nonnull Whiteboard whiteboard) {
- this.whiteboard = whiteboard;
+ this.whiteboard = checkNotNull(whiteboard);
return this;
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java Wed Dec 11 09:45:37 2013
@@ -1,27 +1,35 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
+
package org.apache.jackrabbit.oak.spi.commit;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Queues.newArrayBlockingQueue;
+import java.io.Closeable;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -42,94 +50,118 @@ import org.slf4j.LoggerFactory;
* the background observer thread has yet to process are automatically merged
* to just one change.
*/
-public class BackgroundObserver implements Observer {
-
- private static class ContentChange {
- private final NodeState root;
- private final CommitInfo info;
- ContentChange(NodeState root, CommitInfo info) {
- this.root = root;
- this.info = info;
- }
- }
-
+public class BackgroundObserver implements Observer, Closeable {
/**
* Signal for the background thread to stop processing changes.
*/
private static final ContentChange STOP = new ContentChange(null, null);
- private static Logger getLogger(@Nonnull Observer observer) {
- return LoggerFactory.getLogger(checkNotNull(observer).getClass());
- }
+ /**
+ * The receiving observer being notified off the background thread.
+ */
+ private final Observer observer;
+
+ /**
+ * Executor used to dispatch events
+ */
+ private final Executor executor;
+
+ /**
+ * Handler for uncaught exception on the background thread
+ */
+ private final UncaughtExceptionHandler exceptionHandler;
/**
* The queue of content changes to be processed.
*/
private final BlockingQueue<ContentChange> queue;
+ private static class ContentChange {
+ private final NodeState root;
+ private final CommitInfo info;
+ ContentChange(NodeState root, CommitInfo info) {
+ this.root = root;
+ this.info = info;
+ }
+ }
+
/**
* The content change that was last added to the queue.
* Used to compact external changes.
*/
- private ContentChange last = null;
+ private ContentChange last;
/**
* Flag to indicate that some content changes were dropped because
* the queue was full.
*/
- private boolean full = false;
+ private boolean full;
/**
- * The background thread used to process changes.
+ * Current background task
*/
- private final Thread thread;
-
- public BackgroundObserver(
- @Nonnull final Observer observer, int queueLength,
- @Nonnull UncaughtExceptionHandler exceptionHandler) {
- checkNotNull(observer);
- checkArgument(queueLength > 0);
- checkNotNull(exceptionHandler);
-
- this.queue = newArrayBlockingQueue(queueLength);
+ private volatile ListenableFutureTask currentTask = ListenableFutureTask.completed();
- this.thread = new Thread(new Runnable() {
+ /**
+ * Completion handler: set the current task to the next task and schedules that one
+ * on the background thread.
+ */
+ private final Runnable completionHandler = new Runnable() {
+ Callable<Void> task = new Callable<Void>() {
@Override
- public void run() {
+ public Void call() throws Exception {
try {
- while (true) {
- ContentChange change = queue.take();
- if (change == STOP) {
- return;
- }
+ ContentChange change = queue.poll();
+ while (change != null && change != STOP) {
observer.contentChanged(change.root, change.info);
+ change = queue.poll();
}
- } catch (InterruptedException e) {
- getLogger(observer).warn(
- "Event processing interrupted for " + observer, e);
+ } catch (Throwable t) {
+ exceptionHandler.uncaughtException(Thread.currentThread(), t);
}
+ return null;
}
- });
- thread.setName(observer.toString());
- thread.setPriority(Thread.MIN_PRIORITY);
- thread.setDaemon(true);
- thread.setUncaughtExceptionHandler(exceptionHandler);
- thread.start();
+ };
+
+ @Override
+ public void run() {
+ currentTask = new ListenableFutureTask(task);
+ executor.execute(currentTask);
+ }
+ };
+
+ /**
+ * {@code true} after this observer has been stopped
+ */
+ private volatile boolean stopped;
+
+ public BackgroundObserver(
+ @Nonnull Observer observer,
+ @Nonnull Executor executor,
+ int queueLength,
+ @Nonnull UncaughtExceptionHandler exceptionHandler) {
+ this.observer = checkNotNull(observer);
+ this.executor = checkNotNull(executor);
+ this.exceptionHandler = checkNotNull(exceptionHandler);
+ this.queue = newArrayBlockingQueue(queueLength);
}
public BackgroundObserver(
- @Nonnull final Observer observer, int queueLength) {
- this(observer, queueLength, new UncaughtExceptionHandler() {
+ @Nonnull final Observer observer,
+ @Nonnull Executor executor,
+ int queueLength) {
+ this(observer, executor, queueLength, new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
- getLogger(observer).error(
- "Uncaught exception in " + observer, e);
+ getLogger(observer).error("Uncaught exception in " + observer, e);
}
});
}
- public BackgroundObserver(@Nonnull Observer observer) {
- this(observer, 1000);
+ public BackgroundObserver(
+ @Nonnull Observer observer,
+ @Nonnull Executor executor) {
+ this(observer, executor, 1000);
}
/**
@@ -139,18 +171,25 @@ public class BackgroundObserver implemen
* middle of such a call, then that call is allowed to complete; i.e.
* the thread is not forcibly interrupted. This method returns immediately
* without blocking to wait for the thread to finish.
+ * <p>
+ * After a call to this method further calls to {@link #contentChanged(NodeState, CommitInfo)}
+ * will throw a {@code IllegalStateException}.
*/
- public synchronized void stop() {
+ @Override
+ public synchronized void close() {
queue.clear();
queue.add(STOP);
- // no need to join the thread; it will stop when encountering the STOP
+ stopped = true;
}
//----------------------------------------------------------< Observer >--
+ /**
+ * @throws IllegalStateException if {@link #close()} has already been called.
+ */
@Override
- public synchronized void contentChanged(
- @Nonnull NodeState root, @Nullable CommitInfo info) {
+ public synchronized void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+ checkState(!stopped);
checkNotNull(root);
if (info == null && last != null && last.info == null) {
@@ -181,13 +220,73 @@ public class BackgroundObserver implemen
// compacting of external changes shown above.
last = change;
}
+
+ // Set the completion handler on the currently running task. Multiple calls
+ // to onComplete are not a problem here since we always pass the same value.
+ // Thus there is no question as to which of the handlers will effectively run.
+ currentTask.onComplete(completionHandler);
}
- //------------------------------------------------------------< Object >--
+ //------------------------------------------------------------< internal >---
- @Override
- public String toString() {
- return thread.getName() + " &";
+ private static Logger getLogger(@Nonnull Observer observer) {
+ return LoggerFactory.getLogger(checkNotNull(observer).getClass());
+ }
+
+ /**
+ * A future task with a on complete handler.
+ */
+ private static class ListenableFutureTask extends FutureTask<Void> {
+ private final AtomicBoolean ran = new AtomicBoolean(false);
+
+ private volatile Runnable task;
+
+ public ListenableFutureTask(Callable<Void> callable) {
+ super(callable);
+ }
+
+ public ListenableFutureTask(Runnable task) {
+ super(task, null);
+ }
+
+ /**
+ * Set the on complete handler. The handler will run exactly once after
+ * the task terminated. If the task has already terminated at the time of
+ * this method call the handler will execute immediately.
+ * <p>
+ * Note: there is no guarantee to which handler will run when the method
+ * is called multiple times with different arguments.
+ * @param task
+ */
+ public void onComplete(Runnable task) {
+ this.task = task;
+ if (isDone()) {
+ run(task);
+ }
+ }
+
+ @Override
+ protected void done() {
+ run(task);
+ }
+
+ private void run(Runnable runnable) {
+ if (runnable != null && ran.compareAndSet(false, true)) {
+ runnable.run();
+ }
+ }
+
+ private static final Runnable NOP = new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
+
+ public static ListenableFutureTask completed() {
+ ListenableFutureTask f = new ListenableFutureTask(NOP);
+ f.run();
+ return f;
+ }
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ChangeDispatcher.java Wed Dec 11 09:45:37 2013
@@ -37,9 +37,6 @@ import org.apache.jackrabbit.oak.spi.sta
* notifications for all changes reported to this instance.
*/
public class ChangeDispatcher implements Observable, Observer {
- // TODO make the queue size configurable
- private static final int QUEUE_SIZE = 8192;
-
private final CompositeObserver observers = new CompositeObserver();
@Nonnull
@@ -55,23 +52,23 @@ public class ChangeDispatcher implements
/**
* Register a new {@link Observer} for receiving notifications about changes reported to
- * this change dispatcher. Changes are reported asynchronously. Clients need to
- * call {@link java.io.Closeable#close()} close} on the returned {@code Closeable} instance
- * to stop receiving notifications.
+ * this change dispatcher. Changes are reported synchronously and clients need to ensure
+ * to no block any length of time (e.g. by relaying through a {@link BackgroundObserver}).
+ * <p>
+ * Clients need to call {@link java.io.Closeable#close()} close} on the returned
+ * {@code Closeable} instance to stop receiving notifications.
*
* @return a {@link Closeable} instance
*/
@Override
@Nonnull
- public Closeable addObserver(Observer observer) {
- final BackgroundObserver backgroundObserver = new BackgroundObserver(observer, QUEUE_SIZE);
- backgroundObserver.contentChanged(root, null);
- observers.addObserver(backgroundObserver);
+ public Closeable addObserver(final Observer observer) {
+ observer.contentChanged(root, null);
+ observers.addObserver(observer);
return new Closeable() {
@Override
public void close() {
- backgroundObserver.stop();
- observers.removeObserver(backgroundObserver);
+ observers.removeObserver(observer);
}
};
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java Wed Dec 11 09:45:37 2013
@@ -19,14 +19,15 @@
package org.apache.jackrabbit.oak.spi.commit;
+import static com.google.common.base.Objects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider;
-import static com.google.common.base.Objects.toStringHelper;
-
/**
* Commit info instances associate some meta data with a commit.
*/
@@ -59,11 +60,11 @@ public class CommitInfo {
public CommitInfo(@Nonnull String sessionId, @Nullable String userId,
@Nonnull PermissionProvider permissionProvider,
@Nonnull MoveTracker moveTracker, @Nullable String message) {
- this.sessionId = sessionId;
+ this.sessionId = checkNotNull(sessionId);
this.userId = (userId == null) ? OAK_UNKNOWN : userId;
- this.permissionProvider = permissionProvider;
+ this.permissionProvider = checkNotNull(permissionProvider);
+ this.moveTracker = checkNotNull(moveTracker);
this.message = message;
- this.moveTracker = moveTracker;
}
/**
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java?rev=1550090&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java Wed Dec 11 09:45:37 2013
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static com.google.common.collect.Iterables.concat;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.spi.security.authorization.permission.OpenPermissionProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Test;
+
+public class BackgroundObserverTest {
+ private static final CommitInfo COMMIT_INFO = new CommitInfo
+ ("no-session", null, OpenPermissionProvider.getInstance(), new MoveTracker(), null);
+
+ private final List<List<Runnable>> assertionLists = Lists.newArrayList();
+ private CountDownLatch doneCounter;
+
+ /**
+ * Assert that each observer of many running concurrently sees the same
+ * linearly sequence of commits (i.e. sees the commits in the correct order).
+ */
+ @Test
+ public void concurrentObservers() throws InterruptedException {
+ Observer observer = createCompositeObserver(newFixedThreadPool(32), 128);
+
+ for (int k = 0; k < 1024; k++) {
+ contentChanged(observer, k);
+ }
+ done(observer);
+
+ assertTrue(doneCounter.await(5, TimeUnit.SECONDS));
+
+ for (Runnable assertion : concat(assertionLists)) {
+ assertion.run();
+ }
+ }
+
+ private static void contentChanged(Observer observer, long value) {
+ NodeState node = EMPTY_NODE.builder().setProperty("p", value).getNodeState();
+ observer.contentChanged(node, COMMIT_INFO);
+ }
+
+ private static void done(Observer observer) {
+ NodeState node = EMPTY_NODE.builder().setProperty("done", true).getNodeState();
+ observer.contentChanged(node, COMMIT_INFO);
+ }
+
+ private CompositeObserver createCompositeObserver(ExecutorService executor, int count) {
+ CompositeObserver observer = new CompositeObserver();
+
+ for (int k = 0; k < count; k++) {
+ observer.addObserver(createBackgroundObserver(executor));
+ }
+ doneCounter = new CountDownLatch(count);
+ return observer;
+ }
+
+ private Observer createBackgroundObserver(ExecutorService executor) {
+ return new BackgroundObserver(new Observer() {
+ final List<Runnable> assertions = newAssertionList();
+
+ private List<Runnable> newAssertionList() {
+ ArrayList<Runnable> assertionList = Lists.newArrayList();
+ assertionLists.add(assertionList);
+ return assertionList;
+ }
+
+ NodeState previous;
+
+ @Override
+ public void contentChanged(@Nonnull final NodeState root, @Nullable CommitInfo info) {
+ if (root.hasProperty("done")) {
+ doneCounter.countDown();
+ } else if (previous != null) {
+ // Copy previous to avoid closing over it
+ final NodeState p = previous;
+ assertions.add(new Runnable() {
+ @Override
+ public void run() {
+ assertEquals(getP(p) + 1, (long) getP(root));
+ }
+ });
+
+ }
+ previous = root;
+ }
+
+ private Long getP(NodeState previous) {
+ return previous.getProperty("p").getValue(Type.LONG);
+ }
+ }, executor, 1024);
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java?rev=1550090&r1=1550089&r2=1550090&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java Wed Dec 11 09:45:37 2013
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.jcr;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
@@ -148,6 +149,12 @@ public class Jcr {
return this;
}
+ @Nonnull
+ public final Jcr with(@Nonnull Executor executor) {
+ oak.with(checkNotNull(executor));
+ return this;
+ }
+
public Jcr withAsyncIndexing() {
oak.withAsyncIndexing();
return this;