You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2013/04/16 16:28:58 UTC

svn commit: r1468434 - in /jackrabbit/oak/trunk: oak-core/ oak-core/src/main/java/org/apache/jackrabbit/oak/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ oak-j...

Author: mduerig
Date: Tue Apr 16 14:28:57 2013
New Revision: 1468434

URL: http://svn.apache.org/r1468434
Log:
OAK-775: Implement backward compatible observation
first implementation in a separate package for now

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/pom.xml
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/EffectiveNodeTypeProvider.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/ReadOnlyNodeTypeManager.java
    jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java

Modified: jackrabbit/oak/trunk/oak-core/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/pom.xml?rev=1468434&r1=1468433&r2=1468434&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-core/pom.xml Tue Apr 16 14:28:57 2013
@@ -60,6 +60,7 @@
               org.apache.jackrabbit.oak.plugins.nodetype,
               org.apache.jackrabbit.oak.plugins.nodetype.write,
               org.apache.jackrabbit.oak.plugins.observation,
+              org.apache.jackrabbit.oak.plugins.observation2,
               org.apache.jackrabbit.oak.plugins.version,
               org.apache.jackrabbit.oak.spi.query,
               org.apache.jackrabbit.oak.spi.commit,

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1468434&r1=1468433&r2=1468434&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Tue Apr 16 14:28:57 2013
@@ -16,6 +16,10 @@
  */
 package org.apache.jackrabbit.oak;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,9 +41,10 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.CompositeIndexHookProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexHookManager;
 import org.apache.jackrabbit.oak.plugins.index.IndexHookProvider;
+import org.apache.jackrabbit.oak.plugins.observation2.EventQueueWriterProvider;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
-import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
 import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
 import org.apache.jackrabbit.oak.spi.commit.ConflictHandler;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EditorHook;
@@ -56,10 +61,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
-
 /**
  * Builder class for constructing {@link ContentRepository} instances with
  * a set of specified plugin components. This class acts as a public facade
@@ -267,6 +268,7 @@ public class Oak {
 
         // add index hooks later to prevent the OakInitializer to do excessive indexing
         with(IndexHookManager.of(indexHooks));
+        with(new EventQueueWriterProvider());
         withEditorHook();
         CommitHook commitHook = CompositeHook.compose(commitHooks);
         return new ContentRepositoryImpl(

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/EffectiveNodeTypeProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/EffectiveNodeTypeProvider.java?rev=1468434&r1=1468433&r2=1468434&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/EffectiveNodeTypeProvider.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/EffectiveNodeTypeProvider.java Tue Apr 16 14:28:57 2013
@@ -43,6 +43,16 @@ public interface EffectiveNodeTypeProvid
     boolean isNodeType(Tree tree, String nodeTypeName) throws NoSuchNodeTypeException, RepositoryException;
 
     /**
+     * Returns {@code true} if {@code typeName} is of the specified primary node
+     * type or mixin type, or a subtype thereof. Returns {@code false} otherwise.
+     *
+     * @param typeName  the internal oak name of the node type to test
+     * @param superName The internal oak name of the super type to be tested for.
+     * @return {@code true} if the specified node type is of the given node type.
+     */
+    boolean isNodeType(String typeName, String superName);
+
+    /**
      * Calculates and returns the effective node types of the given node.
      *
      * @param targetNode the node for which the types should be calculated.

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/ReadOnlyNodeTypeManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/ReadOnlyNodeTypeManager.java?rev=1468434&r1=1468433&r2=1468434&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/ReadOnlyNodeTypeManager.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/nodetype/ReadOnlyNodeTypeManager.java Tue Apr 16 14:28:57 2013
@@ -318,6 +318,11 @@ public abstract class ReadOnlyNodeTypeMa
                 && contains(supertypes.getValue(Type.NAMES), superName);
     }
 
+    @Override
+    public boolean isNodeType(String typeName, String superName) {
+        return isa(getTypes(), typeName, superName);
+    }
+
     /**
      * Returns all the node types of the given node, in a breadth-first
      * traversal order of the type hierarchy.

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.LISTENERS;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.LISTENER_PATH;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.REP_OBSERVATION;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.USER_DATA;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.USER_ID;
+
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventListener;
+
+import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Root;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO document
+ */
+class EventCollector implements Runnable {
+    private static final Logger log = LoggerFactory.getLogger(EventCollector.class);
+
+    private final ObservationManagerImpl2 observationManager;
+    private final EventQueueReader eventQueueReader;
+    private final EventListener listener;
+    private volatile boolean running;
+    private ScheduledFuture<?> future;
+    private String id;
+
+    public EventCollector(ObservationManagerImpl2 observationManager, EventListener listener, EventFilter filter)
+            throws CommitFailedException {
+        this.observationManager = observationManager;
+        this.listener = listener;
+        this.eventQueueReader = new EventQueueReader(
+                observationManager.getContentSession().getLatestRoot(),
+                observationManager.getNamePathMapper());
+        setFilterSpec(filter);
+    }
+
+    public void updateFilter(EventFilter filter) throws CommitFailedException {
+        updateFilterSpec(filter);
+    }
+
+    public void setUserData(String userData) throws CommitFailedException {
+        Root root = getLatestRoot();
+        Tree listenerSpec = getOrCreateListenerSpec(root);
+        if (userData == null) {
+            listenerSpec.removeProperty(USER_DATA);
+        } else {
+            listenerSpec.setProperty(USER_DATA, userData);
+        }
+        root.commit();
+    }
+
+    /**
+     * Stop this change processor if running. After returning from this methods no further
+     * events will be delivered.
+     * @throws IllegalStateException if not yet started or stopped already
+     */
+    public synchronized void stop() throws CommitFailedException {
+        if (future == null) {
+            throw new IllegalStateException("Change processor not started");
+        }
+
+        try {
+            future.cancel(true);
+            while (running) {
+                wait();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        finally {
+            future = null;
+            clearFilterSpec();
+        }
+    }
+
+    /**
+     * Start the change processor on the passed {@code executor}.
+     * @param executor
+     * @throws IllegalStateException if started already
+     */
+    public synchronized void start(ScheduledExecutorService executor) {
+        checkArgument(future == null, "Change processor started already");
+        future = executor.scheduleWithFixedDelay(this, 100, 1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run() {
+        running = true;
+        try {
+            Iterator<Event> bundle = eventQueueReader.getEventBundle(getId());
+            // FIXME filter by session specific access restrictions
+            if (bundle != null) {
+                observationManager.setHasEvents();
+                listener.onEvent(new EventIteratorAdapter(bundle));
+            }
+        } catch (Exception e) {
+            log.error("Unable to generate or send events", e);
+        } finally {
+            synchronized (this) {
+                running = false;
+                notifyAll();
+            }
+        }
+    }
+
+    //------------------------------------------------------------< private >---
+
+    private String getId() {
+        if (id == null) {
+            id = UUID.randomUUID().toString();
+        }
+        return id;
+    }
+
+    private static Tree getOrCreate(Tree parent, String name) {
+        Tree child = parent.getChild(name);
+        if (child == null) {
+            child = parent.addChild(name);
+        }
+        return child;
+    }
+
+    private Tree getOrCreateListenerSpec(Root root) {
+        return getOrCreate(getOrCreate(getOrCreate(
+                root.getTree('/' + JCR_SYSTEM), REP_OBSERVATION), LISTENERS), getId());
+    }
+
+    private Root getLatestRoot() {
+        return observationManager.getContentSession().getLatestRoot();
+    }
+
+    private void setFilterSpec(EventFilter filter) throws CommitFailedException {
+        Root root = getLatestRoot();
+        Tree listenerSpec = getOrCreateListenerSpec(root);
+        String userId = observationManager.getContentSession().getAuthInfo().getUserID();
+        listenerSpec.setProperty(USER_ID, userId);
+        filter.persist(listenerSpec);
+        root.commit();
+    }
+
+    private void updateFilterSpec(EventFilter filter) throws CommitFailedException {
+        Root root = getLatestRoot();
+        Tree listenerSpec = getOrCreateListenerSpec(root);
+        filter.persist(listenerSpec);
+        root.commit();
+    }
+
+    private void clearFilterSpec() throws CommitFailedException {
+        Root root = getLatestRoot();
+        Tree listenerSpec = root.getTree(LISTENER_PATH + '/' + getId());
+        if  (listenerSpec != null) {
+            listenerSpec.remove();
+            root.commit();
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.DEEP;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.NODE_TYPES;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.NO_LOCAL;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.PATH;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.TYPE;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.UUID;
+
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO document
+ */
+class EventFilter {
+    private static final Logger log = LoggerFactory.getLogger(EventFilter.class);
+
+    private final int eventTypes;
+    private final String path;
+    private final boolean deep;
+    private final String[] uuid;          // TODO implement filtering by uuid
+    private final String[] nodeTypeNames;
+    private final boolean noLocal;        // TODO implement filtering by noLocal
+
+    public EventFilter(int eventTypes, String path, boolean deep, String[] uuid,
+            String[] nodeTypeName, boolean noLocal) {
+        this.eventTypes = eventTypes;
+        this.path = path;
+        this.deep = deep;
+        this.uuid = uuid;
+        this.nodeTypeNames = nodeTypeName;
+        this.noLocal = noLocal;
+    }
+
+    public boolean include(int eventType, String path, @Nullable String[] associatedType, ReadOnlyNodeTypeManager ntMgr) {
+        return includeEventType(eventType)
+                && includePath(path)
+                && includeNodeType(associatedType, ntMgr);
+    }
+
+    //-----------------------------< internal >---------------------------------
+
+    private boolean includeEventType(int eventType) {
+        return (this.eventTypes & eventType) != 0;
+    }
+
+    private boolean includePath(String path) {
+        String app = PathUtils.getParentPath(path);
+        boolean equalPaths = this.path.equals(app);
+        if (!deep && !equalPaths) {
+            return false;
+        }
+        if (deep && !(PathUtils.isAncestor(this.path, app) || equalPaths)) {
+            return false;
+        }
+        return true;
+    }
+
+    private boolean includeNodeType(String[] associatedParentTypes, ReadOnlyNodeTypeManager ntMgr) {
+        if (nodeTypeNames == null || associatedParentTypes == null) {
+            return true;
+        }
+        for (String type : nodeTypeNames) {
+            for (String apt : associatedParentTypes) {
+                if (ntMgr.isNodeType(apt, type)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    void persist(Tree tree) {
+        tree.setProperty(TYPE, eventTypes);
+        tree.setProperty(PATH, path);
+        tree.setProperty(DEEP, deep);
+        if (uuid != null) {
+            tree.setProperty(UUID, Arrays.asList(uuid), Type.STRINGS);
+        }
+        if (nodeTypeNames != null) {
+            tree.setProperty(NODE_TYPES, Arrays.asList(nodeTypeNames), Type.STRINGS);
+        }
+        tree.setProperty(NO_LOCAL, noLocal);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventFilter.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import java.util.Collections;
+import java.util.Map;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.observation.Event;
+
+/**
+ * TODO document
+ */
+public class EventImpl implements Event {
+    private final int type;
+    private final String path;
+    private final String userID;
+    private final String identifier;
+    private final Map<?, ?> info;
+    private final long date;
+    private final String userData;
+
+    public EventImpl(int type, String path, String userID, String identifier, Map<?, ?> info, long date, String userData) {
+        this.type = type;
+        this.path = path;
+        this.userID = userID;
+        this.identifier = identifier;
+        this.info = info == null ? Collections.emptyMap() : info;
+        this.date = date;
+        this.userData = userData;
+    }
+
+    @Override
+    public int getType() {
+        return type;
+    }
+
+    @Override
+    public String getPath() throws RepositoryException {
+        return path;
+    }
+
+    @Override
+    public String getUserID() {
+        return userID;
+    }
+
+    @Override
+    public String getIdentifier() throws RepositoryException {
+        return identifier;
+    }
+
+    @Override
+    public Map<?, ?> getInfo() throws RepositoryException {
+        return info;
+    }
+
+    @Override
+    public String getUserData() throws RepositoryException {
+        return userData;
+    }
+
+    @Override
+    public long getDate() throws RepositoryException {
+        return date;
+    }
+
+    @Override
+    public final boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        EventImpl that = (EventImpl) other;
+        return date == that.date && type == that.type &&
+                (identifier == null ? that.identifier == null : identifier.equals(that.identifier)) &&
+                (info == null ? that.info == null : info.equals(that.info)) &&
+                (path == null ? that.path == null : path.equals(that.path)) &&
+                (userID == null ? that.userID == null : userID.equals(that.userID)) &&
+                (userData == null ? that.userData == null : userData.equals(that.userData));
+
+    }
+
+    @Override
+    public final int hashCode() {
+        int result = type;
+        result = 31 * result + (path == null ? 0 : path.hashCode());
+        result = 31 * result + (userID == null ? 0 : userID.hashCode());
+        result = 31 * result + (identifier == null ? 0 : identifier.hashCode());
+        result = 31 * result + (info == null ? 0 : info.hashCode());
+        result = 31 * result + (int) (date ^ (date >>> 32));
+        result = 31 * result + (userData == null ? 0 :  userData.hashCode());
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "EventImpl{" +
+                "type=" + type +
+                ", path='" + path + '\'' +
+                ", userID='" + userID + '\'' +
+                ", identifier='" + identifier + '\'' +
+                ", info=" + info +
+                ", date=" + date +
+                ", userData=" + userData +
+                '}';
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.DATE;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.EVENTS_PATH;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.PATH;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.TYPE;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.USER_DATA;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.USER_ID;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import javax.jcr.observation.Event;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Root;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+
+/**
+ * TODO document
+ */
+public class EventQueueReader {
+    private final Root root;
+    private final NamePathMapper namePathMapper;
+
+    private Tree bundles;
+    private long nextBundleId = EventQueueWriterProvider.BUNDLE_ID.get();
+
+    public EventQueueReader(Root root, NamePathMapper namePathMapper) {
+        this.root = root;
+        this.namePathMapper = namePathMapper;
+    }
+
+    public Iterator<Event> getEventBundle(final String id) {
+        root.refresh();
+
+        Iterator<Tree> events = getEvents(nextBundleId, id);
+        if (events == null) {
+            return null;
+        }
+
+        return Iterators.transform(events, new Function<Tree, Event>() {
+            @Override
+            public Event apply(Tree event) {
+                return createEvent(event, id);
+            }
+        });
+    }
+
+    private Iterator<Tree> getEvents(long next, final String id) {
+        if (bundles == null) {
+            bundles = root.getTree(EVENTS_PATH);
+        }
+
+        if (bundles != null) {
+            Tree bundle = bundles.getChild(String.valueOf(next));
+            if (bundle != null) {
+                nextBundleId++;
+                if (bundle.getChildrenCount() > 0) {
+                    return Iterators.filter(bundle.getChildren().iterator(),
+                            new Predicate<Tree>() {
+                        @Override
+                        public boolean apply(Tree tree) {
+                            return tree.hasChild(id);
+                        }
+                    });
+                }
+            }
+        }
+        return null;
+    }
+
+    private Event createEvent(Tree event, String id) {
+        int type = (int) getLong(event, TYPE, 0);
+        String path = getJcrPath(event);
+        String userId = getString(event.getChild(id), USER_ID);
+        long date = getLong(event, DATE, 0);
+        String userData = getString(event.getChild(id), USER_DATA);
+        return new EventImpl(type, path, userId, id, Collections.emptyMap(), date, userData);
+    }
+
+    private String getJcrPath(Tree event) {
+        String path = getString(event, PATH);
+        return path == null ? null : namePathMapper.getJcrPath(path);
+    }
+
+    private static long getLong(Tree event, String name, long defaultValue) {
+        PropertyState p = event.getProperty(name);
+        return p == null ? defaultValue : p.getValue(Type.LONG);
+    }
+
+    private static String getString(Tree event, String name) {
+        PropertyState p = event.getProperty(name);
+        return p == null ? null : p.getValue(Type.STRING);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueReader.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import static javax.jcr.observation.Event.NODE_ADDED;
+import static javax.jcr.observation.Event.NODE_REMOVED;
+import static javax.jcr.observation.Event.PROPERTY_ADDED;
+import static javax.jcr.observation.Event.PROPERTY_CHANGED;
+import static javax.jcr.observation.Event.PROPERTY_REMOVED;
+import static org.apache.jackrabbit.JcrConstants.JCR_MIXINTYPES;
+import static org.apache.jackrabbit.JcrConstants.JCR_PRIMARYTYPE;
+import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM;
+import static org.apache.jackrabbit.JcrConstants.JCR_UUID;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.DATE;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.DEEP;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.NODE_TYPES;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.NO_LOCAL;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.PATH;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.TYPE;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.USER_DATA;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.USER_ID;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.UUID;
+
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO document
+ */
+public class EventQueueWriter extends DefaultEditor {
+    private static final Logger log = LoggerFactory.getLogger(EventQueueWriter.class);
+
+    private final NodeState parent;
+    private final EventRecorder eventRecorder;
+    private final String path;
+
+    private int eventId;
+
+    public EventQueueWriter(EventRecorder eventRecorder, String path, NodeState parent) {
+        this.parent = parent;
+        this.eventRecorder = eventRecorder;
+        this.path = path;
+    }
+
+    @Override
+    public void propertyAdded(PropertyState after) throws CommitFailedException {
+        eventRecorder.recordEvent(PROPERTY_ADDED, PathUtils.concat(path, after.getName()),
+                getIdentifier(), getAssociatedParentTypes());
+    }
+
+    @Override
+    public void propertyChanged(PropertyState before, PropertyState after) throws CommitFailedException {
+        eventRecorder.recordEvent(PROPERTY_CHANGED, PathUtils.concat(path, after.getName()),
+                getIdentifier(), getAssociatedParentTypes());
+    }
+
+    @Override
+    public void propertyDeleted(PropertyState before) throws CommitFailedException {
+        eventRecorder.recordEvent(PROPERTY_REMOVED, PathUtils.concat(path, before.getName()),
+                getIdentifier(), getAssociatedParentTypes());
+    }
+
+    @Override
+    public Editor childNodeAdded(String name, NodeState after) throws CommitFailedException {
+        eventRecorder.recordEvent(NODE_ADDED, PathUtils.concat(path, name),
+                getIdentifier(), getAssociatedParentTypes());
+        return getEventQueueWriter(name, after);
+    }
+
+    @Override
+    public Editor childNodeChanged(String name, NodeState before, NodeState after) throws CommitFailedException {
+        return getEventQueueWriter(name, after);
+    }
+
+    @Override
+    public Editor childNodeDeleted(String name, NodeState before) throws CommitFailedException {
+        eventRecorder.recordEvent(NODE_REMOVED, PathUtils.concat(path, name),
+                getIdentifier(), getAssociatedParentTypes());
+        return getEventQueueWriter(name, before);
+    }
+
+    private EventQueueWriter getEventQueueWriter(String name, NodeState after) {
+        String path = PathUtils.concat(this.path, name);
+        return PathUtils.isAncestor('/' + JCR_SYSTEM, path)
+            ? null
+            : new EventQueueWriter(eventRecorder, path, after);
+    }
+
+    private String[] getAssociatedParentTypes() {
+        Set<String> types = Sets.newHashSet();
+
+        PropertyState jcrPrimaryType = parent.getProperty(JCR_PRIMARYTYPE);
+        if (jcrPrimaryType != null) {
+            types.add(jcrPrimaryType.getValue(Type.NAME));
+        }
+
+        PropertyState jcrMixinTypes = parent.getProperty(JCR_MIXINTYPES);
+        if (jcrMixinTypes != null) {
+            Iterables.addAll(types, jcrMixinTypes.getValue(Type.NAMES));
+        }
+
+        return types.toArray(new String[types.size()]);
+    }
+
+    private String getIdentifier() {
+        PropertyState jcrUuid = parent.getProperty(JCR_UUID);
+        return jcrUuid == null ? null : jcrUuid.getValue(Type.STRING);
+    }
+
+    public static class ListenerSpec {
+        private final ReadOnlyNodeTypeManager ntMgr;
+        private final String id;
+        private final String userId;
+        private final String userData;
+        private final EventFilter filter;
+
+        public static ListenerSpec create(ReadOnlyNodeTypeManager ntMgr, String name, NodeState nodeState) {
+            String userId = getString(nodeState, USER_ID);
+            if (userId == null) {
+                log.warn("Invalid specification for observation event listener: userId missing");
+                return null;
+            }
+
+            EventFilter filter = getFilter(nodeState);
+            if (filter == null) {
+                log.warn("Invalid specification for observation event listener: filter missing");
+                return null;
+            }
+
+            String userData = getString(nodeState, USER_DATA);
+            return new ListenerSpec(ntMgr, name, userId, userData, filter);
+        }
+
+        private static EventFilter getFilter(NodeState nodeState) {
+            Long types = getLong(nodeState, TYPE);
+            if (types == null) {
+                return null;
+            }
+
+            String path = getString(nodeState, PATH);
+            if (path == null) {
+                return null;
+            }
+
+            Boolean deep = getBoolean(nodeState, DEEP);
+            if (deep == null) {
+                return null;
+            }
+
+
+            String[] uuids = getStrings(nodeState, UUID);
+            String[] nodeTypes = getStrings(nodeState, NODE_TYPES);
+            Boolean noLocal = getBoolean(nodeState, NO_LOCAL);
+
+            return new EventFilter((int)(long)types, path, deep, uuids, nodeTypes, noLocal);
+        }
+
+        private static String[] getStrings(NodeState nodeState, String name) {
+            PropertyState p = nodeState.getProperty(name);
+            if (p == null) {
+                return null;
+            }
+
+            return Iterables.toArray(p.getValue(Type.STRINGS), String.class);
+        }
+
+        private static Boolean getBoolean(NodeState node, String name) {
+            PropertyState p = node.getProperty(name);
+            return p == null ? null : p.getValue(Type.BOOLEAN);
+        }
+
+        private static String getString(NodeState node, String name) {
+            PropertyState p = node.getProperty(name);
+            return p == null ? null : p.getValue(Type.STRING);
+        }
+
+        private static Long getLong(NodeState node, String name) {
+            PropertyState p = node.getProperty(name);
+            return p == null ? null : p.getValue(Type.LONG);
+        }
+
+        private ListenerSpec(ReadOnlyNodeTypeManager ntMgr, String id, String userId, String userData,
+                EventFilter filter) {
+            this.ntMgr = ntMgr;
+            this.id = id;
+            this.userId = userId;
+            this.userData = userData;
+            this.filter = filter;
+        }
+
+        public boolean matches(int type, String path, String identifier, String[] associatedParentTypes) {
+            return filter.include(type, path, associatedParentTypes, ntMgr);
+        }
+
+        public void persist(NodeBuilder builder) {
+            NodeBuilder b = builder.child(id);
+            b.setProperty(USER_ID, userId);
+            if (userData != null) {
+                b.setProperty(USER_DATA, userData);
+            }
+        }
+    }
+
+    public static class EventRecorder {
+        private final NodeBuilder eventQueue;
+        private final Iterable<ListenerSpec> listenerSpecs;
+        private final long date;
+
+        private long eventId;
+
+        public EventRecorder(NodeBuilder eventQueue, Iterable<ListenerSpec> listenerSpecs) {
+            this.eventQueue = eventQueue;
+            this.listenerSpecs = listenerSpecs;
+            this.date = System.currentTimeMillis();
+        }
+
+        private void recordEvent(int type, String path, String identifier, String[] associatedParentTypes) {
+            // TODO record access restrictions
+            Set<ListenerSpec> receivers = Sets.newHashSet();
+            for (ListenerSpec spec : listenerSpecs) {
+                if (spec.matches(type, path, identifier, associatedParentTypes)) {
+                    receivers.add(spec);
+                }
+            }
+
+            if (!receivers.isEmpty()) {
+                NodeBuilder event = persistEvent(type, path, identifier);
+                for (ListenerSpec receiver : receivers) {
+                    receiver.persist(event);
+                }
+            }
+        }
+
+        private NodeBuilder persistEvent(int type, String path, String identifier) {
+            NodeBuilder event = eventQueue.child(String.valueOf(eventId++));
+            event.setProperty(TYPE, type);
+            event.setProperty(PATH, path);
+            if (identifier != null) {
+                event.setProperty(UUID, identifier);
+            }
+            event.setProperty(DATE, date);
+            return event;
+        }
+
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriter.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.EVENTS;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.LISTENERS;
+import static org.apache.jackrabbit.oak.plugins.observation2.ObservationConstants.REP_OBSERVATION;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
+import org.apache.jackrabbit.oak.plugins.observation2.EventQueueWriter.EventRecorder;
+import org.apache.jackrabbit.oak.plugins.observation2.EventQueueWriter.ListenerSpec;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+/**
+ * TODO document
+ */
+public class EventQueueWriterProvider implements EditorProvider {
+    public static final AtomicLong BUNDLE_ID = new AtomicLong();
+
+    @Override
+    public Editor getRootEditor(NodeState before, NodeState after, NodeBuilder builder) {
+        NodeBuilder queue = getEventQueue(builder);
+        if (queue == null) {
+            return DefaultEditor.INSTANCE;
+        }
+
+        Iterable<ListenerSpec> listenerSpecs = getListenerSpecs(after);
+        if (listenerSpecs == null) {
+            return DefaultEditor.INSTANCE;
+        }
+
+        EventRecorder eventRecorder = new EventRecorder(queue, listenerSpecs);
+        return VisibleEditor.wrap(new EventQueueWriter(eventRecorder, "/", after));
+    }
+
+    private static NodeBuilder getEventQueue(NodeBuilder builder) {
+        if (builder.hasChildNode(JCR_SYSTEM)) {
+            builder = builder.child(JCR_SYSTEM);
+            if (builder.hasChildNode(REP_OBSERVATION)) {
+                builder = builder.child(REP_OBSERVATION);
+                return builder.child(EVENTS).child(String.valueOf(BUNDLE_ID.getAndIncrement()));
+            }
+        }
+        return null;
+    }
+
+    private static Iterable<ListenerSpec> getListenerSpecs(NodeState after) {
+        NodeState listeners = after.getChildNode(JCR_SYSTEM)
+                .getChildNode(REP_OBSERVATION)
+                .getChildNode(LISTENERS);
+
+        ReadOnlyNodeTypeManager ntMgr = ReadOnlyNodeTypeManager.getInstance(after);
+        Set<ListenerSpec> specs = Sets.newHashSet();
+        for (ChildNodeEntry listener : listeners.getChildNodeEntries()) {
+            ListenerSpec spec = ListenerSpec.create(ntMgr, listener.getName(), listener.getNodeState());
+            if (spec != null) {
+                specs.add(spec);
+            }
+        }
+
+        return specs.isEmpty() ? null : specs;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventQueueWriterProvider.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM;
+
+/**
+ * TODO document
+ */
+public interface ObservationConstants {
+    String REP_OBSERVATION = "rep:observation";
+    String LISTENERS = "listeners";
+    String EVENTS = "events";
+
+    String USER_DATA = "userData";
+    String USER_ID = "userId";
+
+    String TYPE = "type";
+    String PATH = "path";
+    String DATE = "date";
+    String DEEP = "deep";
+    String UUID = "uuid";
+    String NODE_TYPES = "nodeTypes";
+    String NO_LOCAL = "noLocal";
+
+    String LISTENER_PATH = '/' + JCR_SYSTEM + '/' + REP_OBSERVATION + '/' + LISTENERS;
+    String EVENTS_PATH = '/' + JCR_SYSTEM + '/' + REP_OBSERVATION + '/' + EVENTS;
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationConstants.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java?rev=1468434&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java Tue Apr 16 14:28:57 2013
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation2;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nullable;
+import javax.jcr.RepositoryException;
+import javax.jcr.UnsupportedRepositoryOperationException;
+import javax.jcr.observation.EventJournal;
+import javax.jcr.observation.EventListener;
+import javax.jcr.observation.EventListenerIterator;
+import javax.jcr.observation.ObservationManager;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.api.Root;
+import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO document
+ */
+public class ObservationManagerImpl2 implements ObservationManager {
+    private static final Logger log = LoggerFactory.getLogger(ObservationManagerImpl2.class);
+
+    private final Root root;
+    private final NamePathMapper namePathMapper;
+    private final ScheduledExecutorService executor;
+    private final ReadOnlyNodeTypeManager ntMgr;
+    private final Map<EventListener, EventCollector> collectors = new HashMap<EventListener, EventCollector>();
+    private final AtomicBoolean hasEvents = new AtomicBoolean(false);
+
+    public ObservationManagerImpl2(Root root, NamePathMapper namePathMapper, ScheduledExecutorService executor) {
+        this.root = checkNotNull(root);
+        this.namePathMapper = checkNotNull(namePathMapper);
+        this.executor = checkNotNull(executor);
+        this.ntMgr = ReadOnlyNodeTypeManager.getInstance(root, namePathMapper);
+    }
+
+    private static void stop(EventCollector collector) {
+        try {
+            collector.stop();
+        }
+        catch (CommitFailedException e) {
+            log.warn("Error while stopping event collector", e);
+        }
+    }
+
+    public synchronized void dispose() {
+        for (EventCollector collector : collectors.values()) {
+            stop(collector);
+        }
+        collectors.clear();
+    }
+
+    /**
+     * Determine whether events have been generated since the time this method has been called.
+     * @return  {@code true} if this {@code ObservationManager} instance has generated events
+     *          since the last time this method has been called, {@code false} otherwise.
+     */
+    public boolean hasEvents() {
+        return hasEvents.getAndSet(false);
+    }
+
+    /**
+     * Validates the given node type names.
+     *
+     * @param nodeTypeNames the node type names.
+     * @return the node type names as oak names.
+     * @throws javax.jcr.nodetype.NoSuchNodeTypeException if one of the node type
+     *         names refers to an non-existing node type.
+     * @throws javax.jcr.RepositoryException if an error occurs while reading from
+     *         the node type manager.
+     */
+    @CheckForNull
+    private String[] getOakTypes(@Nullable String[] nodeTypeNames) throws RepositoryException {
+        if (nodeTypeNames == null) {
+            return null;
+        }
+        String[] oakNames = new String[nodeTypeNames.length];
+        for (int i = 0; i < nodeTypeNames.length; i++) {
+            ntMgr.getNodeType(nodeTypeNames[i]);
+            oakNames[i] = namePathMapper.getOakName(nodeTypeNames[i]);
+        }
+        return oakNames;
+    }
+
+    @Override
+    public synchronized void addEventListener(EventListener listener, int eventTypes, String absPath,
+            boolean isDeep, String[] uuid, String[] nodeTypeName, boolean noLocal) throws RepositoryException {
+
+        String oakPath = namePathMapper.getOakPath(absPath);
+        if (oakPath == null) {
+            throw new RepositoryException("Invalid path: " + absPath);
+        }
+
+        String[] oakTypes = getOakTypes(nodeTypeName);
+        EventFilter filter = new EventFilter(eventTypes, oakPath, isDeep, uuid, oakTypes, noLocal);
+        EventCollector collector = collectors.get(listener);
+        try {
+            if (collector == null) {
+                collector = new EventCollector(this, listener, filter);
+                collectors.put(listener, collector);
+                collector.start(executor);
+            } else {
+                collector.updateFilter(filter);
+            }
+        } catch (CommitFailedException e) {
+            throw new RepositoryException(e);
+        }
+    }
+
+    @Override
+    public synchronized void removeEventListener(EventListener listener) {
+        EventCollector collector = collectors.remove(listener);
+
+        if (collector != null) {
+            stop(collector);
+        }
+    }
+
+    @Override
+    public synchronized EventListenerIterator getRegisteredEventListeners() throws RepositoryException {
+        return new EventListenerIteratorAdapter(ImmutableSet.copyOf(collectors.keySet()));
+    }
+
+    @Override
+    public synchronized void setUserData(String userData) throws RepositoryException {
+        try {
+            for (EventCollector collector : collectors.values()) {
+                collector.setUserData(userData);
+            }
+        } catch (CommitFailedException e) {
+            throw new RepositoryException(e);
+        }
+    }
+
+    @Override
+    public EventJournal getEventJournal() throws RepositoryException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    @Override
+    public EventJournal getEventJournal(int eventTypes, String absPath, boolean isDeep, String[] uuid, String[]
+            nodeTypeName) throws RepositoryException {
+        throw new UnsupportedRepositoryOperationException();
+    }
+
+    //------------------------------------------------------------< internal >---
+
+    void setHasEvents() {
+        hasEvents.set(true);
+    }
+
+    ContentSession getContentSession() {
+        return root.getContentSession();
+    }
+
+    NamePathMapper getNamePathMapper() {
+        return namePathMapper;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl2.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Modified: jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java?rev=1468434&r1=1468433&r2=1468434&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java (original)
+++ jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java Tue Apr 16 14:28:57 2013
@@ -16,9 +16,12 @@
  */
 package org.apache.jackrabbit.oak.jcr;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.jcr.PathNotFoundException;
@@ -44,14 +47,13 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.nodetype.DefinitionProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.EffectiveNodeTypeProvider;
 import org.apache.jackrabbit.oak.plugins.observation.ObservationManagerImpl;
+import org.apache.jackrabbit.oak.plugins.observation2.ObservationManagerImpl2;
 import org.apache.jackrabbit.oak.plugins.value.ValueFactoryImpl;
 import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider;
 import org.apache.jackrabbit.oak.spi.xml.ProtectedItemImporter;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Instances of this class are passed to all JCR implementation classes
  * (e.g. {@code SessionImpl}, {@code NodeImpl}, etc.) and provide access to
@@ -59,6 +61,9 @@ import static com.google.common.base.Pre
  * {@code ValueFactory}, etc.).
  */
 public abstract class SessionContext implements NamePathMapper {
+    // FIXME remove OAK-775 feature flag and unify ObservationManager implementations
+    private static final boolean OAK_775 = Boolean.getBoolean("OAK-775");
+
     private final RepositoryImpl repository;
     private final SessionDelegate delegate;
     private final SessionNamespaces namespaces;
@@ -70,7 +75,7 @@ public abstract class SessionContext imp
     private PrincipalManager principalManager;
     private UserManager userManager;
     private PrivilegeManager privilegeManager;
-    private ObservationManagerImpl observationManager;
+    private ObservationManager observationManager;
 
     private SessionContext(RepositoryImpl repository,
                            final SessionDelegate delegate) {
@@ -226,14 +231,22 @@ public abstract class SessionContext imp
     @Nonnull
     public ObservationManager getObservationManager() {
         if (observationManager == null) {
-            observationManager = new ObservationManagerImpl(
+            if (OAK_775) {
+                observationManager = new ObservationManagerImpl2(
+                        delegate.getRoot(), namePathMapper, repository.getObservationExecutor());
+            } else {
+                observationManager = new ObservationManagerImpl(
                     delegate.getRoot(), namePathMapper, repository.getObservationExecutor());
+            }
         }
         return observationManager;
     }
 
     public boolean hasPendingEvents() {
-        return observationManager != null && observationManager.hasEvents();
+        return observationManager != null &&
+                (OAK_775
+                    ? ((ObservationManagerImpl2) observationManager).hasEvents()
+                    : ((ObservationManagerImpl) observationManager).hasEvents());
     }
 
     //-----------------------------------------------------< NamePathMapper >---
@@ -318,7 +331,11 @@ public abstract class SessionContext imp
 
     void dispose() {
         if (observationManager != null) {
-            observationManager.dispose();
+            if (OAK_775) {
+                ((ObservationManagerImpl2) observationManager).dispose();
+            } else {
+                ((ObservationManagerImpl) observationManager).dispose();
+            }
         }
         namespaces.clear();
     }