You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2014/02/12 18:18:42 UTC

svn commit: r1567690 - /jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java

Author: mduerig
Date: Wed Feb 12 17:18:41 2014
New Revision: 1567690

URL: http://svn.apache.org/r1567690
Log:
OAK-1413: Add scalability tests for large operations
Delayed event handling: assert that slow event handlers do not impact the overall system

Modified:
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java?rev=1567690&r1=1567689&r2=1567690&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/LargeOperationIT.java Wed Feb 12 17:18:41 2014
@@ -19,6 +19,7 @@
 
 package org.apache.jackrabbit.oak.jcr;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static javax.jcr.observation.Event.NODE_ADDED;
 import static org.junit.Assert.fail;
 
@@ -28,7 +29,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
@@ -344,6 +352,36 @@ public class LargeOperationIT {
         assertBounded("Processing pending events does not scale logarithmically.", quotients, bound);
     }
 
+    @Test
+    public void slowListener() throws RepositoryException, ExecutionException, InterruptedException {
+        Node n = session.getRootNode().addNode("slow-events", "oak:Unstructured");
+        final DelayedEventHandling delayedEventHandling = new DelayedEventHandling(n, 100, 10);
+        Future<Void> result = delayedEventHandling.start();
+
+        try {
+            ArrayList<Double> executionTimes = Lists.newArrayList();
+            for (int scale : scales) {
+                ScalabilityTest test = new ScalabilityTest(scale) {
+                    @Override
+                    void run(int scale) throws InterruptedException {
+                        delayedEventHandling.waitForNodes(scale);
+                    }
+                };
+                double t = test.run();
+                executionTimes.add(t);
+                LOG.info("Adding {} nodes took {} ns/node", scale, t);
+            }
+            Iterable<Double> quotients = quotients(executionTimes);
+            LOG.info("Scaling quotients: {}", quotients);
+            Iterable<Double> bound = getLogarithmicBound(scales, 10);
+            assertBounded("Adding nodes does not scale logarithmically in the face of slow " +
+                    "observation listeners.", quotients, bound);
+        } finally {
+            delayedEventHandling.stop();
+            result.get();
+        }
+    }
+
     //------------------------------------------------------------< ContentGenerator >---
 
     private static class ContentGenerator {
@@ -471,4 +509,90 @@ public class LargeOperationIT {
         }
     }
 
+    //------------------------------------------------------------< DelayedEventHandling >---
+
+    private class DelayedEventHandling implements EventListener {
+        private final ExecutorService executor = Executors.newSingleThreadExecutor();
+        private final Semaphore openEvents = new Semaphore(0);
+        private final AtomicReference<CountDownLatch> nodeCounter =
+                new AtomicReference<CountDownLatch>(new CountDownLatch(0));
+        private final Node node;
+        private final int listenerCount;
+        private final int saveInterval;
+
+        private volatile boolean done;
+
+        private DelayedEventHandling(Node node, int listenerCount, int saveInterval) {
+            this.node = node;
+            this.listenerCount = listenerCount;
+            this.saveInterval = saveInterval;
+        }
+
+        public Future<Void> start() throws RepositoryException {
+            return executor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    final Session[] sessions = new Session[listenerCount];
+                    ContentGenerator contentGenerator = new ContentGenerator(saveInterval) {
+                        int nodeCount;
+
+                        @Override
+                        boolean addNode(Node node) throws RepositoryException {
+                            boolean result = super.addNode(node);
+                            if (++nodeCount % 2 == 0) {
+                                openEvents.release(sessions.length);
+                            }
+                            nodeCounter.get().countDown();
+                            return result;
+                        }
+
+                        @Override
+                        boolean isDone() {
+                            return done;
+                        }
+                    };
+
+                    for (int k = 0; k < sessions.length; k++) {
+                        sessions[k] = createAdminSession();
+                        sessions[k].getWorkspace().getObservationManager().addEventListener(
+                                DelayedEventHandling.this, NODE_ADDED, "/", true, null, null, false);
+                    }
+                    try {
+                        contentGenerator.addNodes(node, Integer.MAX_VALUE);
+                    } finally {
+                        for (Session session : sessions) {
+                            session.logout();
+                        }
+                    }
+                    return null;
+                }
+            });
+        }
+
+        public void stop() {
+            done = true;
+        }
+
+        public void waitForNodes(int count) throws InterruptedException {
+            CountDownLatch counter = new CountDownLatch(count);
+            nodeCounter.set(counter);
+            counter.await();
+        }
+
+        @Override
+        public void onEvent(EventIterator events) {
+            try {
+                while (events.hasNext()) {
+                    while (!done && !openEvents.tryAcquire(10, MILLISECONDS));
+                    if (done) {
+                        break;
+                    }
+                    events.nextEvent();
+                }
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+
+    }
 }