You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/08/11 14:55:42 UTC
svn commit: r1695297 [3/3] - in /jackrabbit/oak/branches/1.0: ./
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugi...
Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java Tue Aug 11 12:55:41 2015
@@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins
import org.junit.Ignore;
import org.junit.Test;
+import static org.apache.jackrabbit.oak.plugins.document.UnsavedModifications.Snapshot.IGNORE;
import static org.junit.Assert.assertEquals;
public class UnsavedModificationsTest {
@@ -96,7 +97,7 @@ public class UnsavedModificationsTest {
public void run() {
while (exceptions.isEmpty()) {
try {
- mod.persist(ns, new ReentrantLock());
+ mod.persist(ns, IGNORE, new ReentrantLock());
Thread.sleep(10);
} catch (Exception e) {
exceptions.add(e);
@@ -169,7 +170,7 @@ public class UnsavedModificationsTest {
paths.clear();
}
if (random.nextFloat() < 0.00005) {
- pending.persist(ns, new ReentrantLock());
+ pending.persist(ns, IGNORE, new ReentrantLock());
}
}
}
@@ -220,7 +221,7 @@ public class UnsavedModificationsTest {
}
// drain pending, this will force it back to in-memory
- pending.persist(ns, new ReentrantLock());
+ pending.persist(ns, IGNORE, new ReentrantLock());
// loop over remaining paths
while (paths.hasNext()) {
@@ -257,7 +258,7 @@ public class UnsavedModificationsTest {
}
// drain pending, this will force it back to in-memory
- pending.persist(ns, new ReentrantLock());
+ pending.persist(ns, IGNORE, new ReentrantLock());
// loop over remaining paths
while (paths.hasNext()) {
Added: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java (added)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.mongodb.DB;
+
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.document.AbstractJournalTest;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.JournalGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class JournalIT extends AbstractJournalTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JournalIT.class);
+
+ @BeforeClass
+ public static void checkMongoDbAvailable() {
+ Assume.assumeNotNull(MongoUtils.getConnection());
+ }
+
+ @Before
+ @After
+ public void dropCollections() throws Exception {
+ MongoConnection mongoConnection = MongoUtils.getConnection();
+ MongoUtils.dropCollections(mongoConnection.getDB());
+ mongoConnection.close();
+ }
+
+ @Test
+ public void cacheInvalidationTest() throws Exception {
+ final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore();
+ final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore();
+ LOG.info("cache size 1: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+
+ // invalidate both caches under test first
+ invalidateDocChildrenCache(ns1);
+ ns1.getDocumentStore().invalidateCache();
+
+ {
+ DocumentStore s = ns1.getDocumentStore();
+ CacheStats cacheStats = s.getCacheStats();
+ LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount()));
+ }
+ LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+
+ // first create child node in instance 1
+ final List<String> paths = createRandomPaths(1, 5000000, 1000);
+ int i=0;
+ for(String path : paths) {
+ if (i++%100==0) {
+ LOG.info("at "+i);
+ }
+ getOrCreate(ns1, path, false);
+ }
+ final List<String> paths2 = createRandomPaths(20, 2345, 100);
+ getOrCreate(ns1, paths2, false);
+ ns1.runBackgroundOperations();
+ for(String path : paths) {
+ assertDocCache(ns1, true, path);
+ }
+
+ {
+ DocumentStore s = ns1.getDocumentStore();
+ CacheStats cacheStats = s.getCacheStats();
+ LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount()));
+ }
+
+ LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount()));
+ long time = System.currentTimeMillis();
+ for(int j=0; j<100; j++) {
+ long now = System.currentTimeMillis();
+ LOG.info("loop "+j+", "+(now-time)+"ms");
+ time = now;
+ final Set<String> electedPaths = choose(paths2, random.nextInt(30));
+ {
+ // choose a random few from above created paths and modify them
+ final long t1 = System.currentTimeMillis();
+ ns2.runBackgroundOperations(); // make sure ns2 has the latest from ns1
+ final long t2 = System.currentTimeMillis();
+ LOG.info("ns2 background took "+(t2-t1)+"ms");
+
+ for(String electedPath : electedPaths) {
+ // modify /child in another instance 2
+ setProperty(ns2, electedPath, "p", "ns2"+System.currentTimeMillis(), false);
+ }
+ final long t3 = System.currentTimeMillis();
+ LOG.info("setting props "+(t3-t2)+"ms");
+
+ ns2.runBackgroundOperations();
+ final long t4 = System.currentTimeMillis();
+ LOG.info("ns2 background took2 "+(t4-t3)+"ms");
+ }
+
+ // that should not have changed the fact that we have it cached in 'ns1'
+ for(String electedPath : electedPaths) {
+ assertDocCache(ns1, true, electedPath);
+ }
+
+ // doing a backgroundOp now should trigger invalidation
+ // which thx to the external modification will remove the entry from the cache:
+ ns1.runBackgroundOperations();
+ for(String electedPath : electedPaths) {
+ assertDocCache(ns1, false, electedPath);
+ }
+
+ // when I access it again with 'ns1', then it gets cached again:
+ for(String electedPath : electedPaths) {
+ getOrCreate(ns1, electedPath, false);
+ assertDocCache(ns1, true, electedPath);
+ }
+ }
+ }
+
+ @Test
+ public void largeCleanupTest() throws Exception {
+ // create more than DELETE_BATCH_SIZE of entries and clean them up
+ // should make sure to loop in JournalGarbageCollector.gc such
+ // that it would find issue described here:
+ // https://issues.apache.org/jira/browse/OAK-2829?focusedCommentId=14585733&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14585733
+
+ doLargeCleanupTest(0, 100);
+ doLargeCleanupTest(200, 1000);// using offset as to not make sure to always create new entries
+ doLargeCleanupTest(2000, 10000);
+ doLargeCleanupTest(20000, 30000); // using 'size' much larger than 30k will be tremendously slow due to ordered node
+ }
+
+ @Test
+ public void simpleCacheInvalidationTest() throws Exception {
+ final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore();
+ final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore();
+
+ // invalidate both caches under test first
+ invalidateDocChildrenCache(ns1);
+ ns1.getDocumentStore().invalidateCache();
+
+ // first create child node in instance 1
+ getOrCreate(ns1, "/child", true);
+ assertDocCache(ns1, true, "/child");
+
+ {
+ // modify /child in another instance 2
+ ns2.runBackgroundOperations(); // read latest changes from ns1
+ setProperty(ns2, "/child", "p", "ns2"+System.currentTimeMillis(), true);
+ }
+ // that should not have changed the fact that we have it cached in 'ns'
+ assertDocCache(ns1, true, "/child");
+
+ // doing a backgroundOp now should trigger invalidation
+ // which thx to the external modification will remove the entry from the cache:
+ ns1.runBackgroundOperations();
+ assertDocCache(ns1, false, "/child");
+
+ // when I access it again with 'ns', then it gets cached again:
+ getOrCreate(ns1, "/child", false);
+ assertDocCache(ns1, true, "/child");
+ }
+
+ private void doLargeCleanupTest(int offset, int size) throws Exception {
+ Clock clock = new Clock.Virtual();
+ DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0,
+ new MemoryDocumentStore(), new MemoryBlobStore());
+ DocumentNodeStore ns1 = mk1.getNodeStore();
+ // make sure we're visible and marked as active
+ renewClusterIdLease(ns1);
+ JournalGarbageCollector gc = new JournalGarbageCollector(ns1);
+ clock.getTimeIncreasing();
+ clock.getTimeIncreasing();
+ gc.gc(0, TimeUnit.MILLISECONDS); // cleanup everything that might still be there
+
+ // create entries as parametrized:
+ for(int i=offset; i<size+offset; i++) {
+ mk1.commit("/", "+\"regular"+i+"\": {}", null, null);
+ // always run background ops to 'flush' the change
+ // into the journal:
+ ns1.runBackgroundOperations();
+ }
+ Thread.sleep(100); // sleep 100millis
+ assertEquals(size, gc.gc(0, TimeUnit.MILLISECONDS)); // should now be able to clean up everything
+ }
+
+ protected DocumentMK createMK(int clusterId, int asyncDelay) {
+ DB db = MongoUtils.getConnection().getDB();
+ builder = newDocumentMKBuilder();
+ return register(builder.setMongoDB(db)
+ .setClusterId(clusterId).setAsyncDelay(asyncDelay).open());
+ }
+
+}
Propchange: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCacheTest.java Tue Aug 11 12:55:41 2015
@@ -86,7 +86,7 @@ public class MongoDiffCacheTest {
MongoDiffCache diffCache = new MongoDiffCache(db, 32, new DocumentMK.Builder());
DiffCache.Entry entry = diffCache.newEntry(
- new Revision(1, 0, 1), new Revision(2, 0, 1));
+ new Revision(1, 0, 1), new Revision(2, 0, 1), false);
for (int i = 0; i < 100; i++) {
for (int j = 0; j < 100; j++) {
for (int k = 0; k < 64; k++) {
Modified: jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/branches/1.0/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java Tue Aug 11 12:55:41 2015
@@ -60,6 +60,7 @@ import org.apache.jackrabbit.oak.spi.whi
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.stats.TimeSeriesMax;
+import org.apache.jackrabbit.oak.util.PerfLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,8 @@ import org.slf4j.LoggerFactory;
*/
class ChangeProcessor implements Observer {
private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
+ private static final PerfLogger PERF_LOGGER = new PerfLogger(
+ LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
/**
* Fill ratio of the revision queue at which commits should be delayed
@@ -280,6 +283,7 @@ class ChangeProcessor implements Observe
public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
if (previousRoot != null) {
try {
+ long start = PERF_LOGGER.start();
FilterProvider provider = filterProvider.get();
// FIXME don't rely on toString for session id
if (provider.includeCommit(contentSession.toString(), info)) {
@@ -297,6 +301,9 @@ class ChangeProcessor implements Observe
}
}
}
+ PERF_LOGGER.end(start, 100,
+ "Generated events (before: {}, after: {})",
+ previousRoot, root);
} catch (Exception e) {
LOG.warn("Error while dispatching observation events for " + tracker, e);
}
Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java Tue Aug 11 12:55:41 2015
@@ -26,12 +26,19 @@ import static javax.jcr.observation.Even
import static javax.jcr.observation.Event.PROPERTY_ADDED;
import static javax.jcr.observation.Event.PROPERTY_CHANGED;
import static javax.jcr.observation.Event.PROPERTY_REMOVED;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.getServices;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
@@ -41,8 +48,16 @@ import javax.jcr.observation.EventIterat
import javax.jcr.observation.EventListener;
import javax.jcr.observation.ObservationManager;
+import com.google.common.collect.Lists;
+
import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.fixture.JcrCreator;
+import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
public class ObservationTest extends Benchmark {
public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | NODE_MOVED |
@@ -51,6 +66,8 @@ public class ObservationTest extends Ben
private static final int SAVE_INTERVAL = Integer.getInteger("saveInterval", 100);
private static final int OUTPUT_RESOLUTION = 100;
private static final int LISTENER_COUNT = Integer.getInteger("listenerCount", 100);
+ private static final int WRITER_COUNT = Integer.getInteger("writerCount", 1);
+ private static final String PATH_FILTER = System.getProperty("pathFilter");
@Override
public void run(Iterable<RepositoryFixture> fixtures) {
@@ -58,9 +75,21 @@ public class ObservationTest extends Ben
if (fixture.isAvailable(1)) {
System.out.format("%s: Observation throughput benchmark%n", fixture);
try {
- Repository[] cluster = fixture.setUpCluster(1);
+ final AtomicReference<Whiteboard> whiteboardRef = new AtomicReference<Whiteboard>();
+ Repository[] cluster;
+ if (fixture instanceof OakRepositoryFixture) {
+ cluster = ((OakRepositoryFixture) fixture).setUpCluster(1, new JcrCreator() {
+ @Override
+ public Jcr customize(Oak oak) {
+ whiteboardRef.set(oak.getWhiteboard());
+ return new Jcr(oak);
+ }
+ });
+ } else {
+ cluster = fixture.setUpCluster(1);
+ }
try {
- run(cluster[0]);
+ run(cluster[0], whiteboardRef.get());
} finally {
fixture.tearDownCluster();
}
@@ -71,88 +100,160 @@ public class ObservationTest extends Ben
}
}
- private void run(Repository repository) throws RepositoryException, ExecutionException, InterruptedException {
+ private void run(Repository repository, @Nullable Whiteboard whiteboard)
+ throws RepositoryException, ExecutionException, InterruptedException {
Session session = createSession(repository);
long t0 = System.currentTimeMillis();
try {
- observationThroughput(repository);
+ observationThroughput(repository, whiteboard);
} finally {
System.out.println("Time elapsed: " + (System.currentTimeMillis() - t0) + " ms");
session.logout();
}
}
- public void observationThroughput(final Repository repository)
+ public void observationThroughput(final Repository repository,
+ @Nullable Whiteboard whiteboard)
throws RepositoryException, InterruptedException, ExecutionException {
long t = 0;
final AtomicInteger eventCount = new AtomicInteger();
final AtomicInteger nodeCount = new AtomicInteger();
- Session[] sessions = new Session[LISTENER_COUNT];
- EventListener[] listeners = new Listener[LISTENER_COUNT];
+ List<Session> sessions = Lists.newArrayList();
+ List<EventListener> listeners = Lists.newArrayList();
+ List<String> testPaths = Lists.newArrayList();
+ Session s = createSession(repository);
+ String path = "/path/to/observation/benchmark-" + AbstractTest.TEST_ID;
try {
- for (int k = 0; k < LISTENER_COUNT; k++) {
- sessions[k] = createSession(repository);
- listeners[k] = new Listener(eventCount);
- ObservationManager obsMgr = sessions[k].getWorkspace().getObservationManager();
- obsMgr.addEventListener(listeners[k], EVENT_TYPES, "/", true, null, null, false);
+ Node testRoot = JcrUtils.getOrCreateByPath(path, null, s);
+ for (int i = 0; i < WRITER_COUNT; i++) {
+ testPaths.add(testRoot.addNode("session-" + i).getPath());
}
+ s.save();
+ } finally {
+ s.logout();
+ }
- Future<?> createNodes = Executors.newSingleThreadExecutor().submit(new Runnable() {
- private final Session session = repository.login(new SimpleCredentials("admin", "admin".toCharArray()));
+ String pathFilter = PATH_FILTER == null ? path : PATH_FILTER;
+ System.out.println("Path filter for event listener: " + pathFilter);
+ ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT);
+ try {
+ for (int k = 0; k < LISTENER_COUNT; k++) {
+ sessions.add(createSession(repository));
+ listeners.add(new Listener(eventCount));
+ ObservationManager obsMgr = sessions.get(k).getWorkspace().getObservationManager();
+ obsMgr.addEventListener(listeners.get(k), EVENT_TYPES, pathFilter, true, null, null, false);
+ }
+ // also add a listener on the root node
+ addRootListener(repository, sessions, listeners);
- @Override
- public void run() {
- try {
- Node testRoot = session.getRootNode().addNode("observationBenchmark");
- createChildren(testRoot, 100);
- for (Node m : JcrUtils.getChildNodes(testRoot)) {
- createChildren(m, 100);
- for (Node n : JcrUtils.getChildNodes(m)) {
- createChildren(n, 5);
+ List<Future<Object>> createNodes = Lists.newArrayList();
+ for (final String p : testPaths) {
+ createNodes.add(service.submit(new Callable<Object>() {
+ private final Session session = createSession(repository);
+ private int numNodes = 0;
+
+ @Override
+ public Object call() throws Exception {
+ try {
+ Node testRoot = session.getNode(p);
+ createChildren(testRoot, 100);
+ for (Node m : JcrUtils.getChildNodes(testRoot)) {
+ createChildren(m, 100 / WRITER_COUNT);
+ for (Node n : JcrUtils.getChildNodes(m)) {
+ createChildren(n, 5);
+ }
}
+ session.save();
+ } finally {
+ session.logout();
}
- session.save();
- } catch (RepositoryException e) {
- throw new RuntimeException(e);
- } finally {
- session.logout();
+ return null;
}
- }
- private void createChildren(Node node, int count) throws RepositoryException {
- for (int c = 0; c < count; c++) {
- node.addNode("n" + c);
- if (nodeCount.incrementAndGet() % SAVE_INTERVAL == 0) {
- node.getSession().save();
+ private void createChildren(Node node, int count)
+ throws RepositoryException {
+ for (int c = 0; c < count; c++) {
+ node.addNode("n" + c);
+ nodeCount.incrementAndGet();
+ if (++numNodes % SAVE_INTERVAL == 0) {
+ node.getSession().save();
+ }
}
}
- }
- });
+ }));
+ }
- System.out.println("ms #node nodes/s #event event/s event ratio");
- while (!createNodes.isDone() || (eventCount.get() < nodeCount.get() * EVENTS_PER_NODE)) {
+ System.out.println("ms #node nodes/s #event event/s event-ratio queue external");
+ while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT < nodeCount.get() * EVENTS_PER_NODE)) {
long t0 = System.currentTimeMillis();
Thread.sleep(OUTPUT_RESOLUTION);
t += System.currentTimeMillis() - t0;
int nc = nodeCount.get();
int ec = eventCount.get() / LISTENER_COUNT;
+ int[] ql = getObservationQueueLength(whiteboard);
double nps = (double) nc / t * 1000;
double eps = (double) ec / t * 1000;
double epn = (double) ec / nc / EVENTS_PER_NODE;
- System.out.format("%7d %7d %7.1f %7d %7.1f %1.2f%n", t, nc, nps, ec, eps, epn);
+ System.out.format(
+ "%7d %7d %7.1f %7d %7.1f %7.2f %7d %7d%n",
+ t, nc, nps, ec, eps, epn, ql[0], ql[1]);
}
- createNodes.get();
+ get(createNodes);
} finally {
- for (int k = 0; k < LISTENER_COUNT; k++) {
- sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]);
- sessions[k].logout();
+ for (int k = 0; k < sessions.size(); k++) {
+ sessions.get(k).getWorkspace().getObservationManager()
+ .removeEventListener(listeners.get(k));
+ sessions.get(k).logout();
+ }
+ service.shutdown();
+ service.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ }
+
+ private void addRootListener(Repository repository,
+ List<Session> sessions,
+ List<EventListener> listeners)
+ throws RepositoryException {
+ Session s = createSession(repository);
+ sessions.add(s);
+ Listener listener = new Listener(new AtomicInteger());
+ ObservationManager obsMgr = s.getWorkspace().getObservationManager();
+ obsMgr.addEventListener(listener, EVENT_TYPES, "/", true, null, null, false);
+ listeners.add(listener);
+ }
+
+ private static int[] getObservationQueueLength(@Nullable Whiteboard wb) {
+ if (wb == null) {
+ return new int[]{-1, -1};
+ }
+ int len = -1;
+ int ext = -1;
+ for (BackgroundObserverMBean bean : getServices(wb, BackgroundObserverMBean.class)) {
+ len = Math.max(bean.getQueueSize(), len);
+ ext = Math.max(bean.getExternalEventCount(), ext);
+ }
+ return new int[]{len, ext};
+ }
+
+ private static boolean isDone(Iterable<Future<Object>> futures) {
+ for (Future f : futures) {
+ if (!f.isDone()) {
+ return false;
}
}
+ return true;
+ }
+
+ private static void get(Iterable<Future<Object>> futures)
+ throws ExecutionException, InterruptedException {
+ for (Future f : futures) {
+ f.get();
+ }
}
private static Session createSession(Repository repository)
Added: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java?rev=1695297&view=auto
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java (added)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java Tue Aug 11 12:55:41 2015
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.fixture;
+
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+
+public interface JcrCreator {
+ JcrCreator DEFAULT = new JcrCreator() {
+ @Override
+ public Jcr customize(Oak oak) {
+ return new Jcr(oak);
+ }
+ };
+
+ public Jcr customize(Oak oak);
+}
\ No newline at end of file
Propchange: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java?rev=1695297&r1=1695296&r2=1695297&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java (original)
+++ jackrabbit/oak/branches/1.0/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java Tue Aug 11 12:55:41 2015
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.fixture;
import java.io.File;
+
import javax.jcr.Repository;
import org.apache.jackrabbit.api.JackrabbitRepository;
@@ -90,10 +91,14 @@ public class OakRepositoryFixture implem
@Override
public final Repository[] setUpCluster(int n) throws Exception {
+ return setUpCluster(n, JcrCreator.DEFAULT);
+ }
+
+ public Repository[] setUpCluster(int n, JcrCreator customizer) throws Exception {
Oak[] oaks = oakFixture.setUpCluster(n);
cluster = new Repository[oaks.length];
for (int i = 0; i < oaks.length; i++) {
- cluster[i] = new Jcr(oaks[i]).createRepository();;
+ cluster[i] = customizer.customize(oaks[i]).createRepository();
}
return cluster;
}