You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/02/12 18:18:42 UTC
svn commit: r1567690 -
/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java
Author: mduerig
Date: Wed Feb 12 17:18:41 2014
New Revision: 1567690
URL: http://svn.apache.org/r1567690
Log:
OAK-1413: Add scalability tests for large operations
Delayed event handling: assert that slow event handlers do not impact the overall system
Modified:
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java
Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java?rev=1567690&r1=1567689&r2=1567690&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java Wed Feb 12 17:18:41 2014
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.jcr;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static javax.jcr.observation.Event.NODE_ADDED;
import static org.junit.Assert.fail;
@@ -28,7 +29,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
@@ -344,6 +352,36 @@ public class LargeOperationIT {
assertBounded("Processing pending events does not scale logarithmically.", quotients, bound);
}
+ @Test
+ public void slowListener() throws RepositoryException, ExecutionException, InterruptedException {
+ Node n = session.getRootNode().addNode("slow-events", "oak:Unstructured");
+ final DelayedEventHandling delayedEventHandling = new DelayedEventHandling(n, 100, 10);
+ Future<Void> result = delayedEventHandling.start();
+
+ try {
+ ArrayList<Double> executionTimes = Lists.newArrayList();
+ for (int scale : scales) {
+ ScalabilityTest test = new ScalabilityTest(scale) {
+ @Override
+ void run(int scale) throws InterruptedException {
+ delayedEventHandling.waitForNodes(scale);
+ }
+ };
+ double t = test.run();
+ executionTimes.add(t);
+ LOG.info("Adding {} nodes took {} ns/node", scale, t);
+ }
+ Iterable<Double> quotients = quotients(executionTimes);
+ LOG.info("Scaling quotients: {}", quotients);
+ Iterable<Double> bound = getLogarithmicBound(scales, 10);
+ assertBounded("Adding nodes does not scale logarithmically in the face of slow " +
+ "observation listeners.", quotients, bound);
+ } finally {
+ delayedEventHandling.stop();
+ result.get();
+ }
+ }
+
//------------------------------------------------------------< ContentGenerator >---
private static class ContentGenerator {
@@ -471,4 +509,90 @@ public class LargeOperationIT {
}
}
+ //------------------------------------------------------------< DelayedEventHandling >---
+
+ private class DelayedEventHandling implements EventListener {
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final Semaphore openEvents = new Semaphore(0);
+ private final AtomicReference<CountDownLatch> nodeCounter =
+ new AtomicReference<CountDownLatch>(new CountDownLatch(0));
+ private final Node node;
+ private final int listenerCount;
+ private final int saveInterval;
+
+ private volatile boolean done;
+
+ private DelayedEventHandling(Node node, int listenerCount, int saveInterval) {
+ this.node = node;
+ this.listenerCount = listenerCount;
+ this.saveInterval = saveInterval;
+ }
+
+ public Future<Void> start() throws RepositoryException {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final Session[] sessions = new Session[listenerCount];
+ ContentGenerator contentGenerator = new ContentGenerator(saveInterval) {
+ int nodeCount;
+
+ @Override
+ boolean addNode(Node node) throws RepositoryException {
+ boolean result = super.addNode(node);
+ if (++nodeCount % 2 == 0) {
+ openEvents.release(sessions.length);
+ }
+ nodeCounter.get().countDown();
+ return result;
+ }
+
+ @Override
+ boolean isDone() {
+ return done;
+ }
+ };
+
+ for (int k = 0; k < sessions.length; k++) {
+ sessions[k] = createAdminSession();
+ sessions[k].getWorkspace().getObservationManager().addEventListener(
+ DelayedEventHandling.this, NODE_ADDED, "/", true, null, null, false);
+ }
+ try {
+ contentGenerator.addNodes(node, Integer.MAX_VALUE);
+ } finally {
+ for (Session session : sessions) {
+ session.logout();
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ public void stop() {
+ done = true;
+ }
+
+ public void waitForNodes(int count) throws InterruptedException {
+ CountDownLatch counter = new CountDownLatch(count);
+ nodeCounter.set(counter);
+ counter.await();
+ }
+
+ @Override
+ public void onEvent(EventIterator events) {
+ try {
+ while (events.hasNext()) {
+ while (!done && !openEvents.tryAcquire(10, MILLISECONDS));
+ if (done) {
+ break;
+ }
+ events.nextEvent();
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ }
}