You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/08/11 14:55:42 UTC

svn commit: r1695297 [3/3] - in /jackrabbit/oak/branches/1.0: ./ oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugi...

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java Tue Aug 11 12:55:41 2015
@@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.jackrabbit.oak.plugins.document.UnsavedModifications.Snapshot.IGNORE;
 import static org.junit.Assert.assertEquals;
 
 public class UnsavedModificationsTest {
@@ -96,7 +97,7 @@ public class UnsavedModificationsTest {
             public void run() {
                 while (exceptions.isEmpty()) {
                     try {
-                        mod.persist(ns, new ReentrantLock());
+                        mod.persist(ns, IGNORE, new ReentrantLock());
                         Thread.sleep(10);
                     } catch (Exception e) {
                         exceptions.add(e);
@@ -169,7 +170,7 @@ public class UnsavedModificationsTest {
                                 paths.clear();
                             }
                             if (random.nextFloat() < 0.00005) {
-                                pending.persist(ns, new ReentrantLock());
+                                pending.persist(ns, IGNORE, new ReentrantLock());
                             }
                         }
                     }
@@ -220,7 +221,7 @@ public class UnsavedModificationsTest {
         }
 
         // drain pending, this will force it back to in-memory
-        pending.persist(ns, new ReentrantLock());
+        pending.persist(ns, IGNORE, new ReentrantLock());
 
         // loop over remaining paths
         while (paths.hasNext()) {
@@ -257,7 +258,7 @@ public class UnsavedModificationsTest {
         }
 
         // drain pending, this will force it back to in-memory
-        pending.persist(ns, new ReentrantLock());
+        pending.persist(ns, IGNORE, new ReentrantLock());
 
         // loop over remaining paths
         while (paths.hasNext()) {

Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java (added)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.mongodb.DB;
+
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.document.AbstractJournalTest;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.JournalGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class JournalIT extends AbstractJournalTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JournalIT.class);
+
+    @BeforeClass
+    public static void checkMongoDbAvailable() {
+        Assume.assumeNotNull(MongoUtils.getConnection());
+    }
+
+    @Before
+    @After
+    public void dropCollections() throws Exception {
+        MongoConnection mongoConnection = MongoUtils.getConnection();
+        MongoUtils.dropCollections(mongoConnection.getDB());
+        mongoConnection.close();
+    }
+
+    @Test
+    public void cacheInvalidationTest() throws Exception {
+        final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore();
+        final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore();
+        LOG.info("cache size 1: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+
+        // invalidate both caches under test first
+        invalidateDocChildrenCache(ns1);
+        ns1.getDocumentStore().invalidateCache();
+
+        {
+            DocumentStore s = ns1.getDocumentStore();
+            CacheStats cacheStats = s.getCacheStats();
+            LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount()));
+        }
+        LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+
+        // first create child node in instance 1
+        final List<String> paths = createRandomPaths(1, 5000000, 1000);
+        int i=0;
+        for(String path : paths) {
+            if (i++%100==0) {
+                LOG.info("at "+i);
+            }
+            getOrCreate(ns1, path, false);
+        }
+        final List<String> paths2 = createRandomPaths(20, 2345, 100);
+        getOrCreate(ns1, paths2, false);
+        ns1.runBackgroundOperations();
+        for(String path : paths) {
+            assertDocCache(ns1, true, path);
+        }
+
+        {
+            DocumentStore s = ns1.getDocumentStore();
+            CacheStats cacheStats = s.getCacheStats();
+            LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount()));
+        }
+
+        LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+        long time = System.currentTimeMillis();
+        for(int j=0; j<100; j++) {
+            long now = System.currentTimeMillis();
+            LOG.info("loop "+j+", "+(now-time)+"ms");
+            time = now;
+            final Set<String> electedPaths = choose(paths2, random.nextInt(30));
+            {
+                // choose a random few from above created paths and modify them
+                final long t1 = System.currentTimeMillis();
+                ns2.runBackgroundOperations(); // make sure ns2 has the latest from ns1
+                final long t2 = System.currentTimeMillis();
+                LOG.info("ns2 background took "+(t2-t1)+"ms");
+
+                for(String electedPath : electedPaths) {
+                    // modify /child in another instance 2
+                    setProperty(ns2, electedPath, "p", "ns2"+System.currentTimeMillis(), false);
+                }
+                final long t3 = System.currentTimeMillis();
+                LOG.info("setting props "+(t3-t2)+"ms");
+
+                ns2.runBackgroundOperations();
+                final long t4 = System.currentTimeMillis();
+                LOG.info("ns2 background took2 "+(t4-t3)+"ms");
+            }
+
+            // that should not have changed the fact that we have it cached in 'ns1'
+            for(String electedPath : electedPaths) {
+                assertDocCache(ns1, true, electedPath);
+            }
+
+            // doing a backgroundOp now should trigger invalidation
+            // which thx to the external modification will remove the entry from the cache:
+            ns1.runBackgroundOperations();
+            for(String electedPath : electedPaths) {
+                assertDocCache(ns1, false, electedPath);
+            }
+
+            // when I access it again with 'ns1', then it gets cached again:
+            for(String electedPath : electedPaths) {
+                getOrCreate(ns1, electedPath, false);
+                assertDocCache(ns1, true, electedPath);
+            }
+        }
+    }
+
+    @Test
+    public void largeCleanupTest() throws Exception {
+        // create more than DELETE_BATCH_SIZE of entries and clean them up
+        // should make sure to loop in JournalGarbageCollector.gc such
+        // that it would find issue described here:
+        // https://issues.apache.org/jira/browse/OAK-2829?focusedCommentId=14585733&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14585733
+
+        doLargeCleanupTest(0, 100);
+        doLargeCleanupTest(200, 1000);// using offset as to not make sure to always create new entries
+        doLargeCleanupTest(2000, 10000);
+        doLargeCleanupTest(20000, 30000); // using 'size' much larger than 30k will be tremendously slow due to ordered node
+    }
+
+    @Test
+    public void simpleCacheInvalidationTest() throws Exception {
+        final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore();
+        final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore();
+
+        // invalidate both caches under test first
+        invalidateDocChildrenCache(ns1);
+        ns1.getDocumentStore().invalidateCache();
+
+        // first create child node in instance 1
+        getOrCreate(ns1, "/child", true);
+        assertDocCache(ns1, true, "/child");
+
+        {
+            // modify /child in another instance 2
+            ns2.runBackgroundOperations(); // read latest changes from ns1
+            setProperty(ns2, "/child", "p", "ns2"+System.currentTimeMillis(), true);
+        }
+        // that should not have changed the fact that we have it cached in 'ns'
+        assertDocCache(ns1, true, "/child");
+
+        // doing a backgroundOp now should trigger invalidation
+        // which thx to the external modification will remove the entry from the cache:
+        ns1.runBackgroundOperations();
+        assertDocCache(ns1, false, "/child");
+
+        // when I access it again with 'ns', then it gets cached again:
+        getOrCreate(ns1, "/child", false);
+        assertDocCache(ns1, true, "/child");
+    }
+
+    private void doLargeCleanupTest(int offset, int size) throws Exception {
+        Clock clock = new Clock.Virtual();
+        DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0,
+                new MemoryDocumentStore(), new MemoryBlobStore());
+        DocumentNodeStore ns1 = mk1.getNodeStore();
+        // make sure we're visible and marked as active
+        renewClusterIdLease(ns1);
+        JournalGarbageCollector gc = new JournalGarbageCollector(ns1);
+        clock.getTimeIncreasing();
+        clock.getTimeIncreasing();
+        gc.gc(0, TimeUnit.MILLISECONDS); // cleanup everything that might still be there
+
+        // create entries as parametrized:
+        for(int i=offset; i<size+offset; i++) {
+            mk1.commit("/", "+\"regular"+i+"\": {}", null, null);
+            // always run background ops to 'flush' the change
+            // into the journal:
+            ns1.runBackgroundOperations();
+        }
+        Thread.sleep(100); // sleep 100millis
+        assertEquals(size, gc.gc(0, TimeUnit.MILLISECONDS)); // should now be able to clean up everything
+    }
+
+    protected DocumentMK createMK(int clusterId, int asyncDelay) {
+        DB db = MongoUtils.getConnection().getDB();
+        builder = newDocumentMKBuilder();
+        return register(builder.setMongoDB(db)
+                .setClusterId(clusterId).setAsyncDelay(asyncDelay).open());
+    }
+
+}

Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java Tue Aug 11 12:55:41 2015
@@ -86,7 +86,7 @@ public class MongoDiffCacheTest {
         
         MongoDiffCache diffCache = new MongoDiffCache(db, 32, new DocumentMK.Builder());
         DiffCache.Entry entry = diffCache.newEntry(
-                new Revision(1, 0, 1), new Revision(2, 0, 1));
+                new Revision(1, 0, 1), new Revision(2, 0, 1), false);
         for (int i = 0; i < 100; i++) {
             for (int j = 0; j < 100; j++) {
                 for (int k = 0; k < 64; k++) {

Modified: jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Tue Aug 11 12:55:41 2015
@@ -60,6 +60,7 @@ import org.apache.jackrabbit.oak.spi.whi
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
 import org.apache.jackrabbit.oak.stats.TimeSeriesMax;
+import org.apache.jackrabbit.oak.util.PerfLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +73,8 @@ import org.slf4j.LoggerFactory;
  */
 class ChangeProcessor implements Observer {
     private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
+    private static final PerfLogger PERF_LOGGER = new PerfLogger(
+            LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
 
     /**
      * Fill ratio of the revision queue at which commits should be delayed
@@ -280,6 +283,7 @@ class ChangeProcessor implements Observe
     public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
         if (previousRoot != null) {
             try {
+                long start = PERF_LOGGER.start();
                 FilterProvider provider = filterProvider.get();
                 // FIXME don't rely on toString for session id
                 if (provider.includeCommit(contentSession.toString(), info)) {
@@ -297,6 +301,9 @@ class ChangeProcessor implements Observe
                         }
                     }
                 }
+                PERF_LOGGER.end(start, 100,
+                        "Generated events (before: {}, after: {})",
+                        previousRoot, root);
             } catch (Exception e) {
                 LOG.warn("Error while dispatching observation events for " + tracker, e);
             }

Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java Tue Aug 11 12:55:41 2015
@@ -26,12 +26,19 @@ import static javax.jcr.observation.Even
 import static javax.jcr.observation.Event.PROPERTY_ADDED;
 import static javax.jcr.observation.Event.PROPERTY_CHANGED;
 import static javax.jcr.observation.Event.PROPERTY_REMOVED;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.getServices;
 
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.Nullable;
 import javax.jcr.Node;
 import javax.jcr.Repository;
 import javax.jcr.RepositoryException;
@@ -41,8 +48,16 @@ import javax.jcr.observation.EventIterat
 import javax.jcr.observation.EventListener;
 import javax.jcr.observation.ObservationManager;
 
+import com.google.common.collect.Lists;
+
 import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.fixture.JcrCreator;
+import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
 import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 
 public class ObservationTest extends Benchmark {
     public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | NODE_MOVED |
@@ -51,6 +66,8 @@ public class ObservationTest extends Ben
     private static final int SAVE_INTERVAL = Integer.getInteger("saveInterval", 100);
     private static final int OUTPUT_RESOLUTION = 100;
     private static final int LISTENER_COUNT = Integer.getInteger("listenerCount", 100);
+    private static final int WRITER_COUNT = Integer.getInteger("writerCount", 1);
+    private static final String PATH_FILTER = System.getProperty("pathFilter");
 
     @Override
     public void run(Iterable<RepositoryFixture> fixtures) {
@@ -58,9 +75,21 @@ public class ObservationTest extends Ben
             if (fixture.isAvailable(1)) {
                 System.out.format("%s: Observation throughput benchmark%n", fixture);
                 try {
-                    Repository[] cluster = fixture.setUpCluster(1);
+                    final AtomicReference<Whiteboard> whiteboardRef = new AtomicReference<Whiteboard>();
+                    Repository[] cluster;
+                    if (fixture instanceof OakRepositoryFixture) {
+                        cluster = ((OakRepositoryFixture) fixture).setUpCluster(1, new JcrCreator() {
+                            @Override
+                            public Jcr customize(Oak oak) {
+                                whiteboardRef.set(oak.getWhiteboard());
+                                return new Jcr(oak);
+                            }
+                        });
+                    } else {
+                        cluster = fixture.setUpCluster(1);
+                    }
                     try {
-                        run(cluster[0]);
+                        run(cluster[0], whiteboardRef.get());
                     } finally {
                         fixture.tearDownCluster();
                     }
@@ -71,88 +100,160 @@ public class ObservationTest extends Ben
         }
     }
 
-    private void run(Repository repository) throws RepositoryException, ExecutionException, InterruptedException {
+    private void run(Repository repository, @Nullable Whiteboard whiteboard)
+            throws RepositoryException, ExecutionException, InterruptedException {
         Session session = createSession(repository);
         long t0 = System.currentTimeMillis();
         try {
-            observationThroughput(repository);
+            observationThroughput(repository, whiteboard);
         } finally {
             System.out.println("Time elapsed: " + (System.currentTimeMillis() - t0) + " ms");
             session.logout();
         }
     }
 
-    public void observationThroughput(final Repository repository)
+    public void observationThroughput(final Repository repository,
+                                      @Nullable Whiteboard whiteboard)
             throws RepositoryException, InterruptedException, ExecutionException {
         long t = 0;
         final AtomicInteger eventCount = new AtomicInteger();
         final AtomicInteger nodeCount = new AtomicInteger();
 
-        Session[] sessions = new Session[LISTENER_COUNT];
-        EventListener[] listeners = new Listener[LISTENER_COUNT];
+        List<Session> sessions = Lists.newArrayList();
+        List<EventListener> listeners = Lists.newArrayList();
 
+        List<String> testPaths = Lists.newArrayList();
+        Session s = createSession(repository);
+        String path = "/path/to/observation/benchmark-" + AbstractTest.TEST_ID;
         try {
-            for (int k = 0; k < LISTENER_COUNT; k++) {
-                sessions[k] = createSession(repository);
-                listeners[k] = new Listener(eventCount);
-                ObservationManager obsMgr = sessions[k].getWorkspace().getObservationManager();
-                obsMgr.addEventListener(listeners[k], EVENT_TYPES, "/", true, null, null, false);
+            Node testRoot = JcrUtils.getOrCreateByPath(path, null, s);
+            for (int i = 0; i < WRITER_COUNT; i++) {
+                testPaths.add(testRoot.addNode("session-" + i).getPath());
             }
+            s.save();
+        } finally {
+            s.logout();
+        }
 
-            Future<?> createNodes = Executors.newSingleThreadExecutor().submit(new Runnable() {
-                private final Session session = repository.login(new SimpleCredentials("admin", "admin".toCharArray()));
+        String pathFilter = PATH_FILTER == null ? path : PATH_FILTER;
+        System.out.println("Path filter for event listener: " + pathFilter);
+        ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT);
+        try {
+            for (int k = 0; k < LISTENER_COUNT; k++) {
+                sessions.add(createSession(repository));
+                listeners.add(new Listener(eventCount));
+                ObservationManager obsMgr = sessions.get(k).getWorkspace().getObservationManager();
+                obsMgr.addEventListener(listeners.get(k), EVENT_TYPES, pathFilter, true, null, null, false);
+            }
+            // also add a listener on the root node
+            addRootListener(repository, sessions, listeners);
 
-                @Override
-                public void run() {
-                    try {
-                        Node testRoot = session.getRootNode().addNode("observationBenchmark");
-                        createChildren(testRoot, 100);
-                        for (Node m : JcrUtils.getChildNodes(testRoot)) {
-                            createChildren(m, 100);
-                            for (Node n : JcrUtils.getChildNodes(m)) {
-                                createChildren(n, 5);
+            List<Future<Object>> createNodes = Lists.newArrayList();
+            for (final String p : testPaths) {
+                createNodes.add(service.submit(new Callable<Object>() {
+                    private final Session session = createSession(repository);
+                    private int numNodes = 0;
+
+                    @Override
+                    public Object call() throws Exception {
+                        try {
+                            Node testRoot = session.getNode(p);
+                            createChildren(testRoot, 100);
+                            for (Node m : JcrUtils.getChildNodes(testRoot)) {
+                                createChildren(m, 100 / WRITER_COUNT);
+                                for (Node n : JcrUtils.getChildNodes(m)) {
+                                    createChildren(n, 5);
+                                }
                             }
+                            session.save();
+                        } finally {
+                            session.logout();
                         }
-                        session.save();
-                    } catch (RepositoryException e) {
-                        throw new RuntimeException(e);
-                    } finally {
-                        session.logout();
+                        return null;
                     }
-                }
 
-                private void createChildren(Node node, int count) throws RepositoryException {
-                    for (int c = 0; c < count; c++) {
-                        node.addNode("n" + c);
-                        if (nodeCount.incrementAndGet() % SAVE_INTERVAL == 0) {
-                            node.getSession().save();
+                    private void createChildren(Node node, int count)
+                            throws RepositoryException {
+                        for (int c = 0; c < count; c++) {
+                            node.addNode("n" + c);
+                            nodeCount.incrementAndGet();
+                            if (++numNodes % SAVE_INTERVAL == 0) {
+                                node.getSession().save();
+                            }
                         }
                     }
-                }
-            });
+                }));
+            }
 
-            System.out.println("ms      #node   nodes/s #event  event/s event ratio");
-            while (!createNodes.isDone() || (eventCount.get() < nodeCount.get() * EVENTS_PER_NODE)) {
+            System.out.println("ms      #node   nodes/s #event  event/s event-ratio queue external");
+            while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT < nodeCount.get() * EVENTS_PER_NODE)) {
                 long t0 = System.currentTimeMillis();
                 Thread.sleep(OUTPUT_RESOLUTION);
                 t += System.currentTimeMillis() - t0;
 
                 int nc = nodeCount.get();
                 int ec = eventCount.get() / LISTENER_COUNT;
+                int[] ql = getObservationQueueLength(whiteboard);
 
                 double nps = (double) nc / t * 1000;
                 double eps = (double) ec / t * 1000;
                 double epn = (double) ec / nc / EVENTS_PER_NODE;
 
-                System.out.format("%7d %7d %7.1f %7d %7.1f %1.2f%n", t, nc, nps, ec, eps, epn);
+                System.out.format(
+                        "%7d %7d %7.1f %7d %7.1f %7.2f %7d %7d%n",
+                           t, nc,  nps, ec,  eps,  epn, ql[0], ql[1]);
             }
-            createNodes.get();
+            get(createNodes);
         } finally {
-            for (int k = 0; k < LISTENER_COUNT; k++) {
-                sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]);
-                sessions[k].logout();
+            for (int k = 0; k < sessions.size(); k++) {
+                sessions.get(k).getWorkspace().getObservationManager()
+                        .removeEventListener(listeners.get(k));
+                sessions.get(k).logout();
+            }
+            service.shutdown();
+            service.awaitTermination(1, TimeUnit.MINUTES);
+        }
+    }
+
+    private void addRootListener(Repository repository,
+                                 List<Session> sessions,
+                                 List<EventListener> listeners)
+            throws RepositoryException {
+        Session s = createSession(repository);
+        sessions.add(s);
+        Listener listener = new Listener(new AtomicInteger());
+        ObservationManager obsMgr = s.getWorkspace().getObservationManager();
+        obsMgr.addEventListener(listener, EVENT_TYPES, "/", true, null, null, false);
+        listeners.add(listener);
+    }
+
+    private static int[] getObservationQueueLength(@Nullable Whiteboard wb) {
+        if (wb == null) {
+            return new int[]{-1, -1};
+        }
+        int len = -1;
+        int ext = -1;
+        for (BackgroundObserverMBean bean : getServices(wb, BackgroundObserverMBean.class)) {
+            len = Math.max(bean.getQueueSize(), len);
+            ext = Math.max(bean.getExternalEventCount(), ext);
+        }
+        return new int[]{len, ext};
+    }
+
+    private static boolean isDone(Iterable<Future<Object>> futures) {
+        for (Future f : futures) {
+            if (!f.isDone()) {
+                return false;
             }
         }
+        return true;
+    }
+
+    private static void get(Iterable<Future<Object>> futures)
+            throws ExecutionException, InterruptedException {
+        for (Future f : futures) {
+            f.get();
+        }
     }
 
     private static Session createSession(Repository repository)

Added: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java (added)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.jackrabbit.oak.fixture;
+
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+
+public interface JcrCreator {
+    JcrCreator DEFAULT = new JcrCreator() {
+        @Override
+        public Jcr customize(Oak oak) {
+            return new Jcr(oak);
+        }
+    };
+
+    public Jcr customize(Oak oak);
+}
\ No newline at end of file

Propchange: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java Tue Aug 11 12:55:41 2015
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.fixture;
 
 import java.io.File;
+
 import javax.jcr.Repository;
 
 import org.apache.jackrabbit.api.JackrabbitRepository;
@@ -90,10 +91,14 @@ public class OakRepositoryFixture implem
 
     @Override
     public final Repository[] setUpCluster(int n) throws Exception {
+        return setUpCluster(n, JcrCreator.DEFAULT);
+    }
+
+    public Repository[] setUpCluster(int n, JcrCreator customizer) throws Exception {
         Oak[] oaks = oakFixture.setUpCluster(n);
         cluster = new Repository[oaks.length];
         for (int i = 0; i < oaks.length; i++) {
-            cluster[i] = new Jcr(oaks[i]).createRepository();;
+            cluster[i] = customizer.customize(oaks[i]).createRepository();
         }
         return cluster;
     }