You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/06/05 09:21:15 UTC

svn commit: r1489727 - in /jackrabbit/oak/trunk: oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/core/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ oak-core/src/main/java/org/a...

Author: mduerig
Date: Wed Jun  5 07:21:14 2013
New Revision: 1489727

URL: http://svn.apache.org/r1489727
Log:
OAK-775 Implement backward compatible observation
- Initial implementation of observation where cluster local events support session specific information
- Remove previous observation implementations

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventFilter.java   (contents, props changed)
      - copied, changed from r1489515, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeFilter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationConstants.java
      - copied, changed from r1489515, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/PostCommitHook.java   (contents, props changed)
      - copied, changed from r1489515, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java
Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeFilter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/RootImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/security/user/UserInitializer.java
    jackrabbit/oak/trunk/oak-jcr/pom.xml
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java
    jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/index/SolrCommitHookIT.java
    jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/query/SolrQueryEngineIT.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Wed Jun  5 07:21:14 2013
@@ -16,9 +16,14 @@
  */
 package org.apache.jackrabbit.oak;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
+
 import javax.annotation.Nonnull;
 import javax.jcr.NoSuchWorkspaceException;
 import javax.security.auth.login.LoginException;
@@ -37,7 +42,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
-import org.apache.jackrabbit.oak.plugins.observation2.EventQueueWriterProvider;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
 import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
@@ -58,10 +62,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
-
 /**
  * Builder class for constructing {@link ContentRepository} instances with
  * a set of specified plugin components. This class acts as a public facade
@@ -293,7 +293,6 @@ public class Oak {
 
         // add index hooks later to prevent the OakInitializer to do excessive indexing
         with(new IndexUpdateProvider(indexEditors));
-        with(new EventQueueWriterProvider());
         withEditorHook();
         CommitHook commitHook = CompositeHook.compose(commitHooks);
         return new ContentRepositoryImpl(

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java Wed Jun  5 07:21:14 2013
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.oak.core;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.jcr.Credentials;
@@ -24,6 +26,8 @@ import javax.security.auth.login.LoginEx
 
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.query.CompositeQueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
@@ -33,8 +37,6 @@ import org.apache.jackrabbit.oak.spi.sec
 import org.apache.jackrabbit.oak.spi.security.authentication.LoginContextProvider;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * {@code MicroKernel}-based implementation of
  * the {@link ContentRepository} interface.
@@ -46,6 +48,7 @@ public class ContentRepositoryImpl imple
     private final String defaultWorkspaceName;
     private final SecurityProvider securityProvider;
     private final QueryIndexProvider indexProvider;
+    private final ChangeDispatcher changeDispatcher;
 
     /**
      * Creates an content repository instance based on the given, already
@@ -67,6 +70,7 @@ public class ContentRepositoryImpl imple
         this.defaultWorkspaceName = checkNotNull(defaultWorkspaceName);
         this.securityProvider = checkNotNull(securityProvider);
         this.indexProvider = indexProvider != null ? indexProvider : new CompositeQueryIndexProvider();
+        this.changeDispatcher = new ChangeDispatcher(nodeStore);
     }
 
     @Nonnull
@@ -87,11 +91,15 @@ public class ContentRepositoryImpl imple
         loginContext.login();
 
         return new ContentSessionImpl(loginContext, securityProvider, workspaceName,
-                nodeStore, commitHook, indexProvider);
+                nodeStore, commitHook, changeDispatcher, indexProvider);
     }
 
     public NodeStore getNodeStore() {
         return nodeStore;
     }
 
+    public Listener newListener() {
+        return changeDispatcher.newListener();
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentSessionImpl.java Wed Jun  5 07:21:14 2013
@@ -27,6 +27,7 @@ import javax.security.auth.login.LoginEx
 import org.apache.jackrabbit.oak.api.AuthInfo;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.api.Root;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -47,6 +48,7 @@ class ContentSessionImpl implements Cont
     private final String workspaceName;
     private final NodeStore store;
     private final CommitHook hook;
+    private final ChangeDispatcher changeDispatcher;
     private final QueryIndexProvider indexProvider;
 
     private volatile boolean live = true;
@@ -56,12 +58,14 @@ class ContentSessionImpl implements Cont
                               @Nonnull String workspaceName,
                               @Nonnull NodeStore store,
                               @Nonnull CommitHook hook,
+                              @Nonnull ChangeDispatcher changeDispatcher,
                               @Nonnull QueryIndexProvider indexProvider) {
         this.loginContext = loginContext;
         this.securityProvider = securityProvider;
         this.workspaceName = workspaceName;
         this.store = store;
         this.hook = hook;
+        this.changeDispatcher = changeDispatcher;
         this.indexProvider = indexProvider;
     }
 
@@ -91,9 +95,8 @@ class ContentSessionImpl implements Cont
     @Override
     public Root getLatestRoot() {
         checkLive();
-        RootImpl root = new RootImpl(
-                store, hook, workspaceName, loginContext.getSubject(),
-                securityProvider, indexProvider) {
+        RootImpl root = new RootImpl(store, hook, changeDispatcher.createHook(this), workspaceName,
+                loginContext.getSubject(), securityProvider, indexProvider) {
             @Override
             protected void checkLive() {
                 ContentSessionImpl.this.checkLive();

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/RootImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/RootImpl.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/RootImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/core/RootImpl.java Wed Jun  5 07:21:14 2013
@@ -18,12 +18,17 @@
  */
 package org.apache.jackrabbit.oak.core;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
+import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+
 import javax.annotation.Nonnull;
 import javax.security.auth.Subject;
 
@@ -37,6 +42,7 @@ import org.apache.jackrabbit.oak.api.Roo
 import org.apache.jackrabbit.oak.api.Tree;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.diffindex.UUIDDiffIndexProviderWrapper;
+import org.apache.jackrabbit.oak.plugins.observation.PostCommitHook;
 import org.apache.jackrabbit.oak.query.QueryEngineImpl;
 import org.apache.jackrabbit.oak.security.authentication.SystemSubject;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
@@ -61,10 +67,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
-import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
-
 public class RootImpl implements Root {
 
     /**
@@ -79,6 +81,8 @@ public class RootImpl implements Root {
 
     private final CommitHook hook;
 
+    private final PostCommitHook postHook;
+
     private final String workspaceName;
 
     private final Subject subject;
@@ -125,12 +129,14 @@ public class RootImpl implements Root {
      */
     public RootImpl(NodeStore store,
                     CommitHook hook,
+                    PostCommitHook postHook,
                     String workspaceName,
                     Subject subject,
                     SecurityProvider securityProvider,
                     QueryIndexProvider indexProvider) {
         this.store = checkNotNull(store);
         this.hook = checkNotNull(hook);
+        this.postHook = postHook;
         this.workspaceName = checkNotNull(workspaceName);
         this.subject = checkNotNull(subject);
         this.securityProvider = checkNotNull(securityProvider);
@@ -149,7 +155,7 @@ public class RootImpl implements Root {
 
     public RootImpl(NodeStore store, CommitHook hook) {
         // FIXME: define proper default or pass workspace name with the constructor
-        this(store, hook, Oak.DEFAULT_WORKSPACE_NAME, SystemSubject.INSTANCE,
+        this(store, hook, PostCommitHook.EMPTY, Oak.DEFAULT_WORKSPACE_NAME, SystemSubject.INSTANCE,
                 new OpenSecurityProvider(), new CompositeQueryIndexProvider());
     }
 
@@ -241,7 +247,9 @@ public class RootImpl implements Root {
             @Override
             public CommitFailedException run() {
                 try {
-                    branch.merge(getCommitHook());
+                    NodeState base = branch.getBase();
+                    NodeState newHead = branch.merge(getCommitHook());
+                    postHook.contentChanged(base, newHead);
                     return null;
                 } catch (CommitFailedException e) {
                     return e;

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java?rev=1489727&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java Wed Jun  5 07:21:14 2013
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation;
+
+import static com.google.common.base.Objects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.observation.ObservationConstants.OAK_UNKNOWN;
+
+import java.util.Queue;
+import java.util.Set;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.util.TODO;
+
+public class ChangeDispatcher {
+    private final NodeStore store;
+    private final Set<Listener> listeners = Sets.newHashSet();
+
+    private NodeState previous;
+
+    public ChangeDispatcher(NodeStore store) {
+        this.store = store;
+        previous = checkNotNull(store.getRoot());
+    }
+
+    @Nonnull
+    public Hook createHook(ContentSession contentSession) {
+        return new Hook(contentSession);
+    }
+
+    @Nonnull
+    public Listener newListener() {
+        Listener listener = new Listener();
+        register(listener);
+        return listener;
+    }
+
+    private void contentChanged(@Nonnull NodeState before, @Nonnull NodeState after, ContentSession contentSession) {
+        ChangeSet extChanges;
+        ChangeSet intChange;
+        synchronized (this) {
+            extChanges = externalChange(checkNotNull(before));
+            intChange = internalChange(checkNotNull(after), contentSession);
+        }
+        if (extChanges != null) {
+            add(extChanges);
+        }
+        add(intChange);
+    }
+
+    @CheckForNull
+    private synchronized ChangeSet externalChange(NodeState root) {
+        if (root != previous) {
+            ChangeSet changeSet = ChangeSet.external(previous, root);
+            previous = root;
+            return changeSet;
+        }
+        return null;
+    }
+
+    @Nonnull
+    private synchronized ChangeSet internalChange(NodeState root, ContentSession contentSession) {
+        ChangeSet changeSet = ChangeSet.local(previous, root, contentSession);
+        previous = root;
+        return changeSet;
+    }
+
+    private void register(Listener listener) {
+        synchronized (listeners) {
+            listeners.add(listener);
+        }
+    }
+
+    private void unregister(Listener listener) {
+        synchronized (listeners) {
+            listeners.remove(listener);
+        }
+    }
+
+    private void add(ChangeSet changeSet) {
+        for (Listener l : getListeners()) {
+            l.add(changeSet);
+        }
+    }
+
+    private Listener[] getListeners() {
+        synchronized (listeners) {
+            return listeners.toArray(new Listener[listeners.size()]);
+        }
+    }
+
+    //------------------------------------------------------------< Sink >---
+
+    public class Hook implements PostCommitHook {
+        private final ContentSession contentSession;
+
+        private Hook(ContentSession contentSession) {
+            this.contentSession = contentSession;
+        }
+
+        @Override
+        public void contentChanged(@Nonnull NodeState before, @Nonnull NodeState after) {
+            ChangeDispatcher.this.contentChanged(before, after, contentSession);
+        }
+    }
+
+    //------------------------------------------------------------< Listener >---
+
+    public class Listener {
+        private final Queue<ChangeSet> changeSets = Queues.newLinkedBlockingQueue();
+
+        public void dispose() {
+            unregister(this);
+        }
+
+        @CheckForNull
+        public ChangeSet getChanges() {
+            if (changeSets.isEmpty()) {
+                add(externalChange(store.getRoot()));
+            }
+            return changeSets.isEmpty() ? null : changeSets.remove();
+        }
+
+        private void add(ChangeSet changeSet) {
+            changeSets.add(changeSet);
+        }
+    }
+
+    //------------------------------------------------------------< ChangeSet >---
+
+    public abstract static class ChangeSet {
+        private final NodeState before;
+        private final NodeState after;
+
+        public static ChangeSet local(NodeState base, NodeState head, ContentSession contentSession) {
+            return new InternalChangeSet(base, head, contentSession, System.currentTimeMillis());
+        }
+
+        public static ChangeSet external(NodeState base, NodeState head) {
+            return new ExternalChangeSet(base, head);
+        }
+
+        protected ChangeSet(NodeState before, NodeState after) {
+            this.before = before;
+            this.after = after;
+        }
+
+        public abstract boolean isExternal();
+        public abstract boolean isLocal(ContentSession contentSession);
+        public abstract String getUserId();
+        public abstract String getUserData();
+        public abstract long getDate();
+
+        public void diff(NodeStateDiff diff) {
+            after.compareAgainstBaseState(before, diff);
+        }
+
+        @Override
+        public String toString() {
+            return toStringHelper(this)
+                .add("base", before)
+                .add("head", after)
+                .add("userId", getUserId())
+                .add("userData", getUserData())
+                .add("date", getDate())
+                .toString();
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (other.getClass() != this.getClass()) {
+                return false;
+            }
+
+            ChangeSet that = (ChangeSet) other;
+            return before.equals(that.before) && after.equals(that.after);
+        }
+
+        @Override
+        public int hashCode() {
+            return 31 * before.hashCode() + after.hashCode();
+        }
+
+        private static class InternalChangeSet extends ChangeSet {
+            private static final String DUMMY_USER_DATA =
+                    TODO.dummyImplementation().returnValueOrNull("oak:not implemented");
+
+            private final ContentSession contentSession;
+            private final String userId;
+            private final long date;
+
+            InternalChangeSet(NodeState base, NodeState head, ContentSession contentSession, long date) {
+                super(base, head);
+                this.contentSession = contentSession;
+                this.userId = contentSession.getAuthInfo().getUserID();
+                this.date = date;
+            }
+
+            @Override
+            public boolean isExternal() {
+                return false;
+            }
+
+            @Override
+            public boolean isLocal(ContentSession contentSession) {
+                return this.contentSession == contentSession;
+            }
+
+            @Override
+            public String getUserId() {
+                return userId;
+            }
+
+            @Override
+            public String getUserData() {
+                // TODO implement getUserData
+                return DUMMY_USER_DATA;
+            }
+
+            @Override
+            public long getDate() {
+                return date;
+            }
+
+            @Override
+            public boolean equals(Object other) {
+                if (!super.equals(other)) {
+                    return false;
+                }
+
+                InternalChangeSet that = (InternalChangeSet) other;
+                return date == that.date && contentSession == that.contentSession;
+            }
+        }
+
+        private static class ExternalChangeSet extends ChangeSet {
+            ExternalChangeSet(NodeState base, NodeState head) {
+                super(base, head);
+            }
+
+            @Override
+            public boolean isExternal() {
+                return true;
+            }
+
+            @Override
+            public boolean isLocal(ContentSession contentSession) {
+                return false;
+            }
+
+            @Override
+            public String getUserId() {
+                return OAK_UNKNOWN;
+            }
+
+            @Override
+            public String getUserData() {
+                return OAK_UNKNOWN;
+            }
+
+            @Override
+            public long getDate() {
+                return 0;
+            }
+        }
+
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeDispatcher.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeProcessor.java Wed Jun  5 07:21:14 2013
@@ -1,18 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.jackrabbit.oak.plugins.observation;
 
@@ -34,12 +36,12 @@ import org.apache.jackrabbit.commons.ite
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
-import org.apache.jackrabbit.oak.spi.observation.ChangeExtractor;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.ChangeSet;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
 import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
-import org.apache.jackrabbit.oak.util.TODO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
@@ -49,42 +51,51 @@ import org.slf4j.MarkerFactory;
  * TODO document
  */
 class ChangeProcessor implements Runnable {
+    private static final Logger log = LoggerFactory.getLogger(ChangeProcessor.class);
+    private static final Marker DEPRECATED = MarkerFactory.getMarker("deprecated");
 
-    private static final Logger log =
-            LoggerFactory.getLogger(ChangeProcessor.class);
-
-    private static final Marker DEPRECATED =
-            MarkerFactory.getMarker("deprecated");
-
-    private static final String DUMMY_USER_ID = TODO.dummyImplementation().returnValueOrNull("oak:unknown");
     private final ObservationManagerImpl observationManager;
     private final NamePathMapper namePathMapper;
-    private final ChangeExtractor changeExtractor;
     private final EventListener listener;
-    private final AtomicReference<ChangeFilter> filterRef;
+    private final AtomicReference<EventFilter> filterRef;
+
     private volatile boolean running;
     private volatile boolean stopping;
     private ScheduledFuture<?> future;
+    private Listener changeListener;
 
-    private boolean userIDAccessed = false;
-    private boolean userDataAccessed = false;
-    private boolean isExternalAccessed = false;
-    private boolean userInfoAccessedWithoutExternalsCheck = false;
-    private boolean userInfoAccessedFromExternalEvent = false;
+    private boolean userInfoAccessedWithoutExternalsCheck;
+    private boolean userInfoAccessedFromExternalEvent;
+    private boolean dateAccessedWithoutExternalsCheck;
+    private boolean dateAccessedFromExternalEvent;
 
-    public ChangeProcessor(ObservationManagerImpl observationManager, EventListener listener, ChangeFilter filter) {
+    public ChangeProcessor(ObservationManagerImpl observationManager, EventListener listener, EventFilter filter) {
         this.observationManager = observationManager;
         this.namePathMapper = observationManager.getNamePathMapper();
-        this.changeExtractor = observationManager.getChangeExtractor();
         this.listener = listener;
-        filterRef = new AtomicReference<ChangeFilter>(filter);
+        filterRef = new AtomicReference<EventFilter>(filter);
     }
 
-    public void setFilter(ChangeFilter filter) {
+    public void setFilter(EventFilter filter) {
         filterRef.set(filter);
     }
 
     /**
+     * Start the change processor on the passed {@code executor}.
+     * @param executor
+     * @throws IllegalStateException if started already
+     */
+    public synchronized void start(ScheduledExecutorService executor) {
+        if (future != null) {
+            throw new IllegalStateException("Change processor started already");
+        }
+        stopping = false;
+        changeListener = observationManager.newChangeListener();
+        future = executor.scheduleWithFixedDelay(this, 100, 1000, TimeUnit.MILLISECONDS);
+    }
+
+
+    /**
      * Stop this change processor if running. After returning from this methods no further
      * events will be delivered.
      * @throws IllegalStateException if not yet started or stopped already
@@ -104,31 +115,22 @@ class ChangeProcessor implements Runnabl
             Thread.currentThread().interrupt();
         }
         finally {
+            changeListener.dispose();
             future = null;
         }
     }
 
-    /**
-     * Start the change processor on the passed {@code executor}.
-     * @param executor
-     * @throws IllegalStateException if started already
-     */
-    public synchronized void start(ScheduledExecutorService executor) {
-        if (future != null) {
-            throw new IllegalStateException("Change processor started already");
-        }
-        stopping = false;
-        future = executor.scheduleWithFixedDelay(this, 100, 1000, TimeUnit.MILLISECONDS);
-    }
-
     @Override
     public void run() {
         running = true;
         try{
-            EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff();
-            changeExtractor.getChanges(diff);
-            if (!stopping) {
-                diff.sendEvents();
+            ChangeSet changes = changeListener.getChanges();
+            if (changes != null) {
+                EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(changes);
+                changes.diff(diff);
+                if (!stopping) {
+                    diff.sendEvents();
+                }
             }
         } catch (Exception e) {
             log.error("Unable to generate or send events", e);
@@ -140,18 +142,6 @@ class ChangeProcessor implements Runnabl
         }
     }
 
-    synchronized void userIDAccessed() {
-        userIDAccessed = true;
-    }
-
-    synchronized void userDataAccessed() {
-        userDataAccessed = true;
-    }
-
-    synchronized void externalAccessed() {
-        isExternalAccessed = true;
-    }
-
     synchronized void userInfoAccessedWithoutExternalCheck() {
         if (!userInfoAccessedWithoutExternalsCheck) {
             log.warn(DEPRECATED,
@@ -171,25 +161,48 @@ class ChangeProcessor implements Runnabl
         }
     }
 
+    synchronized void dateAccessedWithoutExternalCheck() {
+        if (!dateAccessedWithoutExternalsCheck) {
+            log.warn(DEPRECATED,
+                    "Event listener " + listener + " is trying to access"
+                    + " event date information without checking for whether"
+                    + " the event is external");
+            dateAccessedWithoutExternalsCheck = true;
+        }
+    }
+
+    synchronized void dateAccessedFromExternalEvent() {
+        if (!dateAccessedFromExternalEvent) {
+            log.warn(DEPRECATED,
+                    "Event listener " + listener + " is trying to access"
+                    + " event date information from an external event");
+            dateAccessedFromExternalEvent = true;
+        }
+    }
+
     //------------------------------------------------------------< private >---
 
     private class EventGeneratingNodeStateDiff implements NodeStateDiff {
         public static final int PURGE_LIMIT = 8192;
 
+        private final ChangeSet changes;
         private final String path;
         private final NodeState associatedParentNode;
 
-        private int childNodeCount;
         private List<Iterator<Event>> events;
+        private int childNodeCount;
+
+        EventGeneratingNodeStateDiff(ChangeSet changes, String path, List<Iterator<Event>> events,
+                NodeState associatedParentNode) {
 
-        EventGeneratingNodeStateDiff(String path, List<Iterator<Event>> events, NodeState associatedParentNode) {
+            this.changes = changes;
             this.path = path;
-            this.associatedParentNode = associatedParentNode;
             this.events = events;
+            this.associatedParentNode = associatedParentNode;
         }
 
-        public EventGeneratingNodeStateDiff() {
-            this("/", new ArrayList<Iterator<Event>>(PURGE_LIMIT), null);
+        public EventGeneratingNodeStateDiff(ChangeSet changes) {
+            this(changes, "/", new ArrayList<Iterator<Event>>(PURGE_LIMIT), null);
         }
 
         public void sendEvents() {
@@ -268,7 +281,7 @@ class ChangeProcessor implements Runnabl
             if (!NodeStateUtils.isHidden(name)
                     && filterRef.get().includeChildren(jcrPath())) {
                 EventGeneratingNodeStateDiff diff = new EventGeneratingNodeStateDiff(
-                        PathUtils.concat(path, name), events, after);
+                        changes, PathUtils.concat(path, name), events, after);
                 if (!after.compareAgainstBaseState(before, diff)) {
                     return false;
                 }
@@ -279,23 +292,26 @@ class ChangeProcessor implements Runnabl
             return !stopping;
         }
 
+        private EventImpl createEvent(int eventType, String jcrPath) {
+            // TODO support, identifier, info
+            return new EventImpl(ChangeProcessor.this, eventType, jcrPath, changes.getUserId(),
+                    null, null, changes.getDate(), changes.getUserData(), changes.isExternal());
+        }
+
         private Event generatePropertyEvent(int eventType, String parentPath, PropertyState property) {
             String jcrPath = namePathMapper.getJcrPath(PathUtils.concat(parentPath, property.getName()));
-
-            // TODO support userId, identifier, info, date
-            return new EventImpl(ChangeProcessor.this, eventType, jcrPath, DUMMY_USER_ID, null, null, 0, null, false);
+            return createEvent(eventType, jcrPath);
         }
 
         private Iterator<Event> generateNodeEvents(int eventType, String parentPath, String name, NodeState node) {
-            ChangeFilter filter = filterRef.get();
+            EventFilter filter = filterRef.get();
             final String path = PathUtils.concat(parentPath, name);
             String jcrParentPath = namePathMapper.getJcrPath(parentPath);
             String jcrPath = namePathMapper.getJcrPath(path);
 
             Iterator<Event> nodeEvent;
             if (filter.include(eventType, jcrParentPath, associatedParentNode)) {
-                // TODO support userId, identifier, info, date
-                Event event = new EventImpl(ChangeProcessor.this, eventType, jcrPath, DUMMY_USER_ID, null, null, 0, null, false);
+                Event event = createEvent(eventType, jcrPath);
                 nodeEvent = Iterators.singletonIterator(event);
             } else {
                 nodeEvent = Iterators.emptyIterator();

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventFilter.java (from r1489515, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeFilter.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventFilter.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventFilter.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeFilter.java&r1=1489515&r2=1489727&rev=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ChangeFilter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventFilter.java Wed Jun  5 07:21:14 2013
@@ -1,18 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.jackrabbit.oak.plugins.observation;
 
@@ -33,7 +35,7 @@ import org.apache.jackrabbit.oak.spi.sta
 /**
  * TODO document
  */
-class ChangeFilter {
+class EventFilter {
 
     private final ReadOnlyNodeTypeManager ntMgr;
     private final NamePathMapper namePathMapper;
@@ -44,10 +46,10 @@ class ChangeFilter {
     private final String[] nodeTypeOakName;
     private final boolean noLocal;        // TODO implement filtering by noLocal
 
-    public ChangeFilter(ReadOnlyNodeTypeManager ntMgr,
-                        NamePathMapper namePathMapper, int eventTypes,
-                        String path, boolean deep, String[] uuid,
-                        String[] nodeTypeName, boolean noLocal)
+    public EventFilter(ReadOnlyNodeTypeManager ntMgr,
+            NamePathMapper namePathMapper, int eventTypes,
+            String path, boolean deep, String[] uuid,
+            String[] nodeTypeName, boolean noLocal)
             throws NoSuchNodeTypeException, RepositoryException {
         this.ntMgr = ntMgr;
         this.namePathMapper = namePathMapper;
@@ -128,9 +130,9 @@ class ChangeFilter {
      *
      * @param nodeTypeNames the node type names.
      * @return the node type names as oak names.
-     * @throws NoSuchNodeTypeException if one of the node type names refers to
+     * @throws javax.jcr.nodetype.NoSuchNodeTypeException if one of the node type names refers to
      *                                 an non-existing node type.
-     * @throws RepositoryException     if an error occurs while reading from the
+     * @throws javax.jcr.RepositoryException     if an error occurs while reading from the
      *                                 node type manager.
      */
     @CheckForNull

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventFilter.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventImpl.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventImpl.java Wed Jun  5 07:21:14 2013
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.jackrabbit.oak.plugins.observation;
 
 import java.util.Collections;
@@ -29,9 +30,8 @@ import org.apache.jackrabbit.api.observa
  * TODO document
  */
 public class EventImpl implements JackrabbitEvent {
-
     private final ChangeProcessor collector;
-    private boolean externalAccessed = false;
+    private boolean externalAccessed;
 
     private final int type;
     private final String path;
@@ -43,10 +43,10 @@ public class EventImpl implements Jackra
     private final boolean external;
 
     public EventImpl(
-            ChangeProcessor collector,
+            ChangeProcessor processor,
             int type, String path, String userID, String identifier,
             Map<?, ?> info, long date, String userData, boolean external) {
-        this.collector = collector;
+        this.collector = processor;
         this.type = type;
         this.path = path;
         this.userID = userID;
@@ -75,7 +75,6 @@ public class EventImpl implements Jackra
         if (external) {
             collector.userInfoAccessedFromExternalEvent();
         }
-        collector.userIDAccessed();
         return userID;
     }
 
@@ -97,19 +96,23 @@ public class EventImpl implements Jackra
         if (external) {
             collector.userInfoAccessedFromExternalEvent();
         }
-        collector.userDataAccessed();
         return userData;
     }
 
     @Override
     public long getDate() throws RepositoryException {
+        if (!externalAccessed) {
+            collector.dateAccessedWithoutExternalCheck();
+        }
+        if (external) {
+            collector.dateAccessedFromExternalEvent();
+        }
         return date;
     }
 
     @Override
     public synchronized boolean isExternal() {
         externalAccessed = true;
-        collector.externalAccessed();
         return external;
     }
 

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationConstants.java (from r1489515, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationConstants.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationConstants.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java&r1=1489515&r2=1489727&rev=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationConstants.java Wed Jun  5 07:21:14 2013
@@ -17,29 +17,11 @@
  * under the License.
  */
 
-package org.apache.jackrabbit.oak.plugins.observation2;
+package org.apache.jackrabbit.oak.plugins.observation;
 
-import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM;
+public final class ObservationConstants {
+    public static final String OAK_UNKNOWN = "oak:unknown";
 
-/**
- * TODO document
- */
-public interface ObservationConstants {
-    String REP_OBSERVATION = "rep:observation";
-    String LISTENERS = "listeners";
-    String EVENTS = "events";
-
-    String USER_DATA = "userData";
-    String USER_ID = "userId";
-
-    String TYPE = "type";
-    String PATH = "path";
-    String DATE = "date";
-    String DEEP = "deep";
-    String UUID = "uuid";
-    String NODE_TYPES = "nodeTypes";
-    String NO_LOCAL = "noLocal";
-
-    String LISTENER_PATH = '/' + JCR_SYSTEM + '/' + REP_OBSERVATION + '/' + LISTENERS;
-    String EVENTS_PATH = '/' + JCR_SYSTEM + '/' + REP_OBSERVATION + '/' + EVENTS;
+    private ObservationConstants() {
+    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/ObservationManagerImpl.java Wed Jun  5 07:21:14 2013
@@ -1,18 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.jackrabbit.oak.plugins.observation;
 
@@ -28,36 +30,34 @@ import javax.jcr.observation.EventListen
 import javax.jcr.observation.EventListenerIterator;
 import javax.jcr.observation.ObservationManager;
 
-import com.google.common.base.Preconditions;
 import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
-import org.apache.jackrabbit.oak.api.Root;
-import org.apache.jackrabbit.oak.core.RootImpl;
+import org.apache.jackrabbit.oak.core.ContentRepositoryImpl;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
 import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
-import org.apache.jackrabbit.oak.spi.observation.ChangeExtractor;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeDispatcher.Listener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 
-/**
- * TODO document
- */
 public class ObservationManagerImpl implements ObservationManager {
     private static final Logger log = LoggerFactory.getLogger(ObservationManagerImpl.class);
+    public static final Marker OBSERVATION = MarkerFactory.getMarker("observation");
 
-    private final RootImpl root;
-    private final NamePathMapper namePathMapper;
-    private final ScheduledExecutorService executor;
     private final Map<EventListener, ChangeProcessor> processors = new HashMap<EventListener, ChangeProcessor>();
     private final AtomicBoolean hasEvents = new AtomicBoolean(false);
+    private final ContentRepositoryImpl contentRepository;
     private final ReadOnlyNodeTypeManager ntMgr;
+    private final NamePathMapper namePathMapper;
+    private final ScheduledExecutorService executor;
 
-    public ObservationManagerImpl(Root root, NamePathMapper namePathMapper, ScheduledExecutorService executor) {
-        Preconditions.checkArgument(root instanceof RootImpl, "root must be of actual type RootImpl");
-        this.root = ((RootImpl) root);
+    public ObservationManagerImpl(ContentRepositoryImpl contentRepository, ReadOnlyNodeTypeManager nodeTypeManager,
+            NamePathMapper namePathMapper, ScheduledExecutorService executor) {
+
+        this.contentRepository = contentRepository;
+        this.ntMgr = nodeTypeManager;
         this.namePathMapper = namePathMapper;
         this.executor = executor;
-        this.ntMgr = ReadOnlyNodeTypeManager.getInstance(root, namePathMapper);
     }
 
     public synchronized void dispose() {
@@ -79,18 +79,16 @@ public class ObservationManagerImpl impl
     @Override
     public synchronized void addEventListener(EventListener listener, int eventTypes, String absPath,
             boolean isDeep, String[] uuid, String[] nodeTypeName, boolean noLocal) throws RepositoryException {
-        ChangeFilter filter = new ChangeFilter(ntMgr, namePathMapper, eventTypes,
+        EventFilter filter = new EventFilter(ntMgr, namePathMapper, eventTypes,
                 absPath, isDeep, uuid, nodeTypeName, noLocal);
         ChangeProcessor processor = processors.get(listener);
         if (processor == null) {
-            log.error(MarkerFactory.getMarker("observation"),
-                    "Registering event listener {} with filter {}", listener, filter);
+            log.error(OBSERVATION, "Registering event listener {} with filter {}", listener, filter);
             processor = new ChangeProcessor(this, listener, filter);
             processors.put(listener, processor);
             processor.start(executor);
         } else {
-            log.debug(MarkerFactory.getMarker("observation"),
-                    "Changing event listener {} to filter {}", listener, filter);
+            log.debug(OBSERVATION, "Changing event listener {} to filter {}", listener, filter);
             processor.setFilter(filter);
         }
     }
@@ -131,11 +129,11 @@ public class ObservationManagerImpl impl
         return namePathMapper;
     }
 
-    ChangeExtractor getChangeExtractor() {
-        return root.getChangeExtractor();
-    }
-
     void setHasEvents() {
         hasEvents.set(true);
     }
+
+    Listener newChangeListener() {
+        return contentRepository.newListener();
+    }
 }

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/PostCommitHook.java (from r1489515, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/PostCommitHook.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/PostCommitHook.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java&r1=1489515&r2=1489727&rev=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/PostCommitHook.java Wed Jun  5 07:21:14 2013
@@ -17,29 +17,20 @@
  * under the License.
  */
 
-package org.apache.jackrabbit.oak.plugins.observation2;
+package org.apache.jackrabbit.oak.plugins.observation;
 
-import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
 
 /**
- * TODO document
+ * TODO unify with Observer or CommitHook
  */
-public interface ObservationConstants {
-    String REP_OBSERVATION = "rep:observation";
-    String LISTENERS = "listeners";
-    String EVENTS = "events";
-
-    String USER_DATA = "userData";
-    String USER_ID = "userId";
-
-    String TYPE = "type";
-    String PATH = "path";
-    String DATE = "date";
-    String DEEP = "deep";
-    String UUID = "uuid";
-    String NODE_TYPES = "nodeTypes";
-    String NO_LOCAL = "noLocal";
+public interface PostCommitHook {
+    PostCommitHook EMPTY = new PostCommitHook() {
+        @Override
+        public void contentChanged(@Nonnull NodeState before, @Nonnull NodeState after) { }
+    };
 
-    String LISTENER_PATH = '/' + JCR_SYSTEM + '/' + REP_OBSERVATION + '/' + LISTENERS;
-    String EVENTS_PATH = '/' + JCR_SYSTEM + '/' + REP_OBSERVATION + '/' + EVENTS;
+    void contentChanged(@Nonnull NodeState before, @Nonnull NodeState after);
 }

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/PostCommitHook.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/PostCommitHook.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/security/user/UserInitializer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/security/user/UserInitializer.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/security/user/UserInitializer.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/security/user/UserInitializer.java Wed Jun  5 07:21:14 2013
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.oak.security.user;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import javax.annotation.Nonnull;
 import javax.jcr.RepositoryException;
 
@@ -28,6 +30,7 @@ import org.apache.jackrabbit.oak.namepat
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.observation.PostCommitHook;
 import org.apache.jackrabbit.oak.security.authentication.SystemSubject;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
@@ -44,8 +47,6 @@ import org.apache.jackrabbit.oak.util.No
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Creates initial set of users to be present in a given workspace. This
  * implementation uses the {@code UserManager} such as defined by the
@@ -98,7 +99,7 @@ class UserInitializer implements Workspa
             throw new RuntimeException(e);
         }
         // TODO reconsider
-        Root root = new RootImpl(store, commitHook, workspaceName, SystemSubject.INSTANCE, new OpenSecurityProvider(), indexProvider);
+        Root root = new RootImpl(store, commitHook, PostCommitHook.EMPTY, workspaceName, SystemSubject.INSTANCE, new OpenSecurityProvider(), indexProvider);
 
         UserConfiguration userConfiguration = securityProvider.getConfiguration(UserConfiguration.class);
         UserManager userManager = userConfiguration.getUserManager(root, NamePathMapper.DEFAULT);

Modified: jackrabbit/oak/trunk/oak-jcr/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/pom.xml?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-jcr/pom.xml Wed Jun  5 07:21:14 2013
@@ -222,7 +222,6 @@
       org.apache.jackrabbit.test.api.query.SQLJoinTest#testJoinSNS                                   <!-- OAK-474 -->
       org.apache.jackrabbit.test.api.query.qom.DescendantNodeJoinConditionTest#testInnerJoin         <!-- OAK-852 -->
       org.apache.jackrabbit.test.api.query.qom.DescendantNodeJoinConditionTest#testLeftOuterJoin     <!-- OAK-852 -->
-      org.apache.jackrabbit.test.api.observation.EventTest#testGetUserId
       org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveNode
       org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveTree
       org.apache.jackrabbit.test.api.observation.NodeMovedTest#testMoveWithRemove

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java Wed Jun  5 07:21:14 2013
@@ -194,4 +194,8 @@ public class RepositoryImpl implements R
         return executor;
     }
 
+    ContentRepository getContentRepository() {
+        return contentRepository;
+    }
+
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java Wed Jun  5 07:21:14 2013
@@ -16,9 +16,12 @@
  */
 package org.apache.jackrabbit.oak.jcr;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.jcr.PathNotFoundException;
@@ -37,6 +40,8 @@ import org.apache.jackrabbit.JcrConstant
 import org.apache.jackrabbit.api.security.authorization.PrivilegeManager;
 import org.apache.jackrabbit.api.security.principal.PrincipalManager;
 import org.apache.jackrabbit.api.security.user.UserManager;
+import org.apache.jackrabbit.oak.api.ContentRepository;
+import org.apache.jackrabbit.oak.core.ContentRepositoryImpl;
 import org.apache.jackrabbit.oak.jcr.delegate.NodeDelegate;
 import org.apache.jackrabbit.oak.jcr.delegate.PropertyDelegate;
 import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
@@ -50,8 +55,8 @@ import org.apache.jackrabbit.oak.namepat
 import org.apache.jackrabbit.oak.plugins.name.Namespaces;
 import org.apache.jackrabbit.oak.plugins.nodetype.DefinitionProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.EffectiveNodeTypeProvider;
+import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
 import org.apache.jackrabbit.oak.plugins.observation.ObservationManagerImpl;
-import org.apache.jackrabbit.oak.plugins.observation2.ObservationManagerImpl2;
 import org.apache.jackrabbit.oak.plugins.value.ValueFactoryImpl;
 import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration;
 import org.apache.jackrabbit.oak.spi.security.authorization.AccessControlConfiguration;
@@ -61,8 +66,6 @@ import org.apache.jackrabbit.oak.spi.sec
 import org.apache.jackrabbit.oak.spi.security.user.UserConfiguration;
 import org.apache.jackrabbit.oak.spi.xml.ProtectedItemImporter;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Instances of this class are passed to all JCR implementation classes
  * (e.g. {@code SessionImpl}, {@code NodeImpl}, etc.) and provide access to
@@ -70,9 +73,6 @@ import static com.google.common.base.Pre
  * {@code ValueFactory}, etc.).
  */
 public abstract class SessionContext implements NamePathMapper {
-    // FIXME remove OAK-775 feature flag and unify ObservationManager implementations
-    private static final boolean OAK_775 = Boolean.getBoolean("OAK-775");
-
     private final RepositoryImpl repository;
     private final SessionDelegate delegate;
     private final SessionNamespaces namespaces;
@@ -244,22 +244,18 @@ public abstract class SessionContext imp
     @Nonnull
     public ObservationManager getObservationManager() {
         if (observationManager == null) {
-            if (OAK_775) {
-                observationManager = new ObservationManagerImpl2(
-                        delegate.getRoot(), namePathMapper, repository.getObservationExecutor());
-            } else {
-                observationManager = new ObservationManagerImpl(
-                    delegate.getRoot(), namePathMapper, repository.getObservationExecutor());
-            }
+            ContentRepository contentRepository = repository.getContentRepository();
+            observationManager = new ObservationManagerImpl(
+                // FIXME don't cast
+                ((ContentRepositoryImpl) contentRepository),
+                ReadOnlyNodeTypeManager.getInstance(delegate.getRoot(), namePathMapper),
+                namePathMapper, repository.getObservationExecutor());
         }
         return observationManager;
     }
 
     public boolean hasPendingEvents() {
-        return observationManager != null &&
-                (OAK_775
-                    ? ((ObservationManagerImpl2) observationManager).hasEvents()
-                    : ((ObservationManagerImpl) observationManager).hasEvents());
+        return observationManager != null && (((ObservationManagerImpl) observationManager).hasEvents());
     }
 
     //-----------------------------------------------------< NamePathMapper >---
@@ -348,11 +344,7 @@ public abstract class SessionContext imp
 
     void dispose() {
         if (observationManager != null) {
-            if (OAK_775) {
-                ((ObservationManagerImpl2) observationManager).dispose();
-            } else {
-                ((ObservationManagerImpl) observationManager).dispose();
-            }
+            ((ObservationManagerImpl) observationManager).dispose();
         }
         namespaces.clear();
     }

Modified: jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/index/SolrCommitHookIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/index/SolrCommitHookIT.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/index/SolrCommitHookIT.java (original)
+++ jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/index/SolrCommitHookIT.java Wed Jun  5 07:21:14 2013
@@ -23,6 +23,7 @@ import javax.security.auth.Subject;
 import org.apache.jackrabbit.oak.api.Root;
 import org.apache.jackrabbit.oak.core.RootImpl;
 import org.apache.jackrabbit.oak.plugins.index.solr.SolrBaseTest;
+import org.apache.jackrabbit.oak.plugins.observation.PostCommitHook;
 import org.apache.jackrabbit.oak.spi.query.CompositeQueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
 import org.apache.solr.client.solrj.SolrQuery;
@@ -37,7 +38,7 @@ public class SolrCommitHookIT extends So
 
     @Override
     protected RootImpl createRootImpl() {
-        return new RootImpl(store, new SolrCommitHook(server), "solr-commit-hook-it", new Subject(),
+        return new RootImpl(store, new SolrCommitHook(server), PostCommitHook.EMPTY, "solr-commit-hook-it", new Subject(),
                 new OpenSecurityProvider(), new CompositeQueryIndexProvider());
     }
 

Modified: jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/query/SolrQueryEngineIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/query/SolrQueryEngineIT.java?rev=1489727&r1=1489726&r2=1489727&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/query/SolrQueryEngineIT.java (original)
+++ jackrabbit/oak/trunk/oak-solr-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/solr/query/SolrQueryEngineIT.java Wed Jun  5 07:21:14 2013
@@ -16,6 +16,11 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.solr.query;
 
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
 import javax.security.auth.Subject;
 
 import org.apache.jackrabbit.oak.api.Root;
@@ -23,10 +28,11 @@ import org.apache.jackrabbit.oak.api.Tre
 import org.apache.jackrabbit.oak.core.RootImpl;
 import org.apache.jackrabbit.oak.plugins.index.IndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.IndexDefinitionImpl;
-import org.apache.jackrabbit.oak.query.ast.Operator;
-import org.apache.jackrabbit.oak.query.index.FilterImpl;
 import org.apache.jackrabbit.oak.plugins.index.solr.SolrBaseTest;
 import org.apache.jackrabbit.oak.plugins.index.solr.index.SolrCommitHook;
+import org.apache.jackrabbit.oak.plugins.observation.PostCommitHook;
+import org.apache.jackrabbit.oak.query.ast.Operator;
+import org.apache.jackrabbit.oak.query.index.FilterImpl;
 import org.apache.jackrabbit.oak.spi.query.CompositeQueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.query.Cursor;
 import org.apache.jackrabbit.oak.spi.query.Filter;
@@ -35,11 +41,6 @@ import org.apache.jackrabbit.oak.spi.que
 import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
 import org.junit.Test;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
-
 /**
  * Integration test for {@link org.apache.jackrabbit.oak.plugins.index.solr.query.SolrQueryIndex} and {@link org.apache.jackrabbit.oak.plugins.index.solr.index.SolrCommitHook} working
  * together
@@ -48,7 +49,7 @@ public class SolrQueryEngineIT extends S
 
     @Override
     protected RootImpl createRootImpl() {
-        return new RootImpl(store, new SolrCommitHook(server), "solr-query-engine-it", new Subject(),
+        return new RootImpl(store, new SolrCommitHook(server), PostCommitHook.EMPTY, "solr-query-engine-it", new Subject(),
                 new OpenSecurityProvider(), new CompositeQueryIndexProvider());
     }