You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/06/21 14:35:18 UTC

svn commit: r1495411 - /jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java

Author: mduerig
Date: Fri Jun 21 12:35:18 2013
New Revision: 1495411

URL: http://svn.apache.org/r1495411
Log:
OAK-144 Implement Observation
- Generalise ExpectationListener to accept any kind of expectations
- Simplify tests

Modified:
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java

Modified: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java?rev=1495411&r1=1495410&r2=1495411&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java Fri Jun 21 12:35:18 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.jackrabbit.oak.jcr.observation;
 
+import static com.google.common.base.Objects.equal;
 import static javax.jcr.observation.Event.NODE_ADDED;
 import static javax.jcr.observation.Event.NODE_MOVED;
 import static javax.jcr.observation.Event.NODE_REMOVED;
@@ -26,19 +27,17 @@ import static javax.jcr.observation.Even
 import static javax.jcr.observation.Event.PROPERTY_CHANGED;
 import static javax.jcr.observation.Event.PROPERTY_REMOVED;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jcr.Node;
 import javax.jcr.Property;
@@ -50,8 +49,10 @@ import javax.jcr.observation.EventListen
 import javax.jcr.observation.ObservationManager;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ForwardingListenableFuture;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
 import org.junit.After;
@@ -59,6 +60,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class ObservationTest extends AbstractRepositoryTest {
+    public static final int ALL_EVENTS = NODE_ADDED | NODE_REMOVED | NODE_MOVED | PROPERTY_ADDED |
+            PROPERTY_REMOVED | PROPERTY_CHANGED | PERSIST;
     private static final String TEST_NODE = "test_node";
     private static final String TEST_PATH = '/' + TEST_NODE;
 
@@ -84,9 +87,7 @@ public class ObservationTest extends Abs
     @Test
     public void observation() throws RepositoryException, ExecutionException, InterruptedException {
         ExpectationListener listener = new ExpectationListener();
-        observationManager.addEventListener(
-                listener, NODE_ADDED | NODE_REMOVED | NODE_MOVED | PROPERTY_ADDED | PROPERTY_REMOVED |
-                PROPERTY_CHANGED | PERSIST, "/", true, null, null, false);
+        observationManager.addEventListener(listener, ALL_EVENTS, "/", true, null, null, false);
         try {
             Node n = getNode(TEST_PATH);
             listener.expectAdd(n.setProperty("p0", "v0"));
@@ -96,7 +97,7 @@ public class ObservationTest extends Abs
             listener.expectAdd(n.addNode("n2"));
             getAdminSession().save();
 
-            List<String> missing = listener.getMissing(2, TimeUnit.SECONDS);
+            List<Expectation> missing = listener.getMissing(2, TimeUnit.SECONDS);
             assertTrue("Missing events: " + missing, missing.isEmpty());
             List<Event> unexpected = listener.getUnexpected();
             assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
@@ -123,15 +124,13 @@ public class ObservationTest extends Abs
     @Test
     public void observation2() throws RepositoryException, InterruptedException, ExecutionException {
         ExpectationListener listener = new ExpectationListener();
-        observationManager.addEventListener(
-                listener, NODE_ADDED | NODE_REMOVED | NODE_MOVED | PROPERTY_ADDED | PROPERTY_REMOVED |
-                PROPERTY_CHANGED | PERSIST, "/", true, null, null, false);
+        observationManager.addEventListener(listener, ALL_EVENTS, "/", true, null, null, false);
         try {
             Node n = getNode(TEST_PATH);
             listener.expectAdd(n.addNode("n1"));
             getAdminSession().save();
 
-            List<String> missing = listener.getMissing(2, TimeUnit.SECONDS);
+            List<Expectation> missing = listener.getMissing(2, TimeUnit.SECONDS);
             assertTrue("Missing events: " + missing, missing.isEmpty());
             List<Event> unexpected = listener.getUnexpected();
             assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
@@ -160,7 +159,7 @@ public class ObservationTest extends Abs
             listener.expectAdd(root.setProperty("prop", "value"));
             root.getSession().save();
 
-            List<String> missing = listener.getMissing(2, TimeUnit.SECONDS);
+            List<Expectation> missing = listener.getMissing(2, TimeUnit.SECONDS);
             assertTrue("Missing events: " + missing, missing.isEmpty());
             List<Event> unexpected = listener.getUnexpected();
             assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
@@ -174,82 +173,64 @@ public class ObservationTest extends Abs
     public void observationDispose()
             throws RepositoryException, InterruptedException, ExecutionException, TimeoutException {
 
-        final AtomicReference<Boolean> stopGeneratingEvents = new AtomicReference<Boolean>(false);
-        final AtomicReference<CountDownLatch> hasEvents = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
-        try {
-            final EventListener listener = new EventListener() {
-                @Override
-                public void onEvent(EventIterator events) {
-                    while (events.hasNext()) {
-                        events.next();
-                        hasEvents.get().countDown();
-                    }
-                }
-            };
+        final ExpectationListener listener = new ExpectationListener();
+        Expectation hasEvents = listener.expect(
+                new Expectation("has events after registering"));
+        final Expectation noEvents = listener.expect(
+                new Expectation("has no more events after unregistering", false));
 
-            observationManager.addEventListener(
-                    listener, NODE_ADDED | NODE_REMOVED | NODE_MOVED | PROPERTY_ADDED | PROPERTY_REMOVED |
-                    PROPERTY_CHANGED | PERSIST, "/", true, null, null, false);
+        observationManager.addEventListener(listener, ALL_EVENTS, "/", true, null, null, false);
 
-            // Generate events
-            Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    Node n = getNode(TEST_PATH);
-                    for (int c = 0; !stopGeneratingEvents.get() ; c++) {
-                        n.addNode("c" + c);
-                        n.getSession().save();
-                    }
-                    return null;
+        // Generate events
+        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+            private int c;
+
+            @Override
+            public void run() {
+                try {
+                    getNode(TEST_PATH)
+                        .addNode("c" + c++)
+                        .getSession()
+                        .save();
                 }
-            });
+                catch (RepositoryException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, 10, 10, TimeUnit.MILLISECONDS);
 
-            // Make sure we see the events
-            assertTrue(hasEvents.get().await(2, TimeUnit.SECONDS));
+        // Make sure we see the events
+        assertNotNull(hasEvents.get(2, TimeUnit.SECONDS));
 
-            // Remove event listener
-            Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    observationManager.removeEventListener(listener);
-                    hasEvents.set(new CountDownLatch(1));
-                    return null;
-                }
-            }).get(10, TimeUnit.SECONDS);
+        // Remove event listener
+        Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                observationManager.removeEventListener(listener);
+                noEvents.enable(true);
+                return null;
+            }
+        }).get(10, TimeUnit.SECONDS);
 
-            // Make sure we don't see any more events
-            assertFalse(hasEvents.get().await(2, TimeUnit.SECONDS));
-        }
-        finally {
-            stopGeneratingEvents.set(true);
-        }
+        // Make sure we see no more events
+        assertFalse(noEvents.wait(2, TimeUnit.SECONDS));
     }
 
     @Test
     public void observationDisposeFromListener()
             throws RepositoryException, InterruptedException, ExecutionException, TimeoutException {
 
-        final AtomicReference<RepositoryException> repositoryException = new AtomicReference<RepositoryException>(null);
-        final AtomicReference<CountDownLatch> unregistered = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
-        final EventListener listener = new EventListener() {
+        final ExpectationListener listener = new ExpectationListener();
+        Expectation unregistered = listener.expect(new Expectation
+                ("Unregistering listener from event handler should not block") {
             @Override
-            public void onEvent(EventIterator events) {
-                try {
-                    // Unregistering listener from event handler should not block
-                    observationManager.removeEventListener(this);
-                }
-                catch (RepositoryException e) {
-                    repositoryException.set(e);
-                }
-                finally {
-                    unregistered.get().countDown();
-                }
+            public boolean onEvent(Event event) throws Exception {
+                observationManager.removeEventListener(listener);
+                return true;
             }
-        };
+        });
 
-        observationManager.addEventListener(
-                listener, NODE_ADDED | NODE_REMOVED | NODE_MOVED | PROPERTY_ADDED | PROPERTY_REMOVED |
-                PROPERTY_CHANGED | PERSIST, "/", true, null, null, false);
+        observationManager.addEventListener(listener, ALL_EVENTS, "/", true, null, null, false);
 
         // Ensure the listener is there
         assertTrue(observationManager.getRegisteredEventListeners().hasNext());
@@ -260,10 +241,7 @@ public class ObservationTest extends Abs
         n.getSession().save();
 
         // Make sure we see the events and the listener is gone
-        assertTrue(unregistered.get().await(2, TimeUnit.SECONDS));
-        if (repositoryException.get() != null) {
-            throw repositoryException.get();
-        }
+        assertNotNull(unregistered.get(2, TimeUnit.SECONDS));
         assertFalse(observationManager.getRegisteredEventListeners().hasNext());
     }
 
@@ -275,19 +253,84 @@ public class ObservationTest extends Abs
 
     //------------------------------------------------------------< ExpectationListener >---
 
+    private static class Expectation extends ForwardingListenableFuture<Event> {
+        private final SettableFuture<Event> future = SettableFuture.create();
+        private final String name;
+
+        private volatile boolean enabled = true;
+
+        Expectation(String name, boolean enabled) {
+            this.name = name;
+            this.enabled = enabled;
+        }
+
+        Expectation(String name) {
+            this(name, true);
+        }
+
+        @Override
+        protected ListenableFuture<Event> delegate() {
+            return future;
+        }
+
+        public void enable(boolean enabled) {
+            this.enabled = enabled;
+        }
+
+        public boolean isEnabled() {
+            return enabled;
+        }
+
+        public void complete(Event event) {
+            future.set(event);
+        }
+
+        public void fail(Exception e) {
+            future.setException(e);
+        }
+
+        public boolean wait(long timeout, TimeUnit unit) {
+            try {
+                future.get(timeout, unit);
+                return true;
+            }
+            catch (Exception e) {
+                return false;
+            }
+        }
+
+        public boolean onEvent(Event event) throws Exception {
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
     private static class ExpectationListener implements EventListener {
-        private final Map<String, SettableFuture<Event>> expected = Maps.newConcurrentMap();
+        private final Set<Expectation> expected = Sets.newCopyOnWriteArraySet();
         private final List<Event> unexpected = Lists.newCopyOnWriteArrayList();
+
         private volatile Exception failed;
 
-        public Future<Event> expect(String path, int type) {
+        public Expectation expect(Expectation expectation) {
             if (failed == null) {
-                SettableFuture<Event> expect = SettableFuture.create();
-                expected.put(key(path, type), expect);
-                return expect;
+                expected.add(expectation);
             } else {
-                return Futures.immediateFailedFuture(failed);
+                expectation.fail(failed);
             }
+            return expectation;
+        }
+
+        public Future<Event> expect(final String path, final int type) {
+            return expect(new Expectation("path = " + path + ", type = " + type) {
+                @Override
+                public boolean onEvent(Event event) throws RepositoryException {
+                    return type == event.getType() && equal(path, event.getPath());
+                }
+            });
         }
 
         public Node expectAdd(Node node) throws RepositoryException {
@@ -317,16 +360,16 @@ public class ObservationTest extends Abs
             return property;
         }
 
-        public List<String> getMissing(int time, TimeUnit timeUnit)
+        public List<Expectation> getMissing(int time, TimeUnit timeUnit)
                 throws ExecutionException, InterruptedException {
-            List<String> missing = Lists.newArrayList();
+            List<Expectation> missing = Lists.newArrayList();
             try {
-                Futures.allAsList(expected.values()).get(time, timeUnit);
+                Futures.allAsList(expected).get(time, timeUnit);
             }
             catch (TimeoutException e) {
-                for (Entry<String, SettableFuture<Event>> entry : expected.entrySet()) {
-                    if (!entry.getValue().isDone()) {
-                        missing.add(entry.getKey());
+                for (Expectation exp : expected) {
+                    if (!exp.isDone()) {
+                        missing.add(exp);
                     }
                 }
             }
@@ -342,16 +385,22 @@ public class ObservationTest extends Abs
             try {
                 while (events.hasNext() && failed == null) {
                     Event event = events.nextEvent();
-                    SettableFuture<Event> f = expected.get(key(event.getPath(), event.getType()));
-                    if (f != null) {
-                        f.set(event);
-                    } else {
+                    boolean found = false;
+                    for (Expectation exp : expected) {
+                        if (exp.isEnabled() && exp.onEvent(event)) {
+                            found = true;
+                            expected.remove(exp);
+                            exp.complete(event);
+                        }
+                    }
+                    if (!found) {
                         unexpected.add(event);
                     }
+
                 }
             } catch (Exception e) {
-                for (SettableFuture<Event> f : expected.values()) {
-                    f.setException(e);
+                for (Expectation exp : expected) {
+                    exp.fail(e);
                 }
                 failed = e;
             }