You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2008/11/14 11:10:19 UTC

svn commit: r713964 - in /jackrabbit/trunk/jackrabbit-core/src: main/java/org/apache/jackrabbit/core/ main/java/org/apache/jackrabbit/core/journal/ main/java/org/apache/jackrabbit/core/observation/ test/java/org/apache/jackrabbit/core/journal/ test/jav...

Author: mreutegg
Date: Fri Nov 14 02:10:15 2008
New Revision: 713964

URL: http://svn.apache.org/viewvc?rev=713964&view=rev
Log:
JCR-1849: EventJournal

Added:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java   (with props)
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java   (with props)
Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java
    jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java Fri Nov 14 02:10:15 2008
@@ -17,11 +17,14 @@
 package org.apache.jackrabbit.core;
 
 import org.apache.jackrabbit.api.JackrabbitWorkspace;
+import org.apache.jackrabbit.api.jsr283.observation.EventJournal;
 import org.apache.jackrabbit.core.config.WorkspaceConfig;
 import org.apache.jackrabbit.core.lock.LockManager;
 import org.apache.jackrabbit.core.observation.EventStateCollection;
 import org.apache.jackrabbit.core.observation.EventStateCollectionFactory;
 import org.apache.jackrabbit.core.observation.ObservationManagerImpl;
+import org.apache.jackrabbit.core.observation.EventJournalImpl;
+import org.apache.jackrabbit.core.observation.EventFilter;
 import org.apache.jackrabbit.core.query.QueryManagerImpl;
 import org.apache.jackrabbit.core.state.LocalItemStateManager;
 import org.apache.jackrabbit.core.state.SharedItemStateManager;
@@ -31,6 +34,8 @@
 import org.apache.jackrabbit.core.xml.ImportHandler;
 import org.apache.jackrabbit.core.xml.Importer;
 import org.apache.jackrabbit.core.xml.WorkspaceImporter;
+import org.apache.jackrabbit.core.cluster.ClusterNode;
+import org.apache.jackrabbit.core.security.principal.AdminPrincipal;
 import org.apache.jackrabbit.commons.AbstractWorkspace;
 import org.apache.jackrabbit.spi.commons.conversion.NameException;
 import org.apache.jackrabbit.spi.Path;
@@ -57,6 +62,7 @@
 import javax.jcr.version.Version;
 import javax.jcr.version.VersionException;
 import javax.jcr.version.VersionHistory;
+import javax.security.auth.Subject;
 
 import java.util.HashMap;
 import java.util.Iterator;
@@ -732,6 +738,43 @@
     }
 
     /**
+     * Returns the event journal for this workspace. The events are filtered
+     * according to the passed criteria.
+     *
+     * @param eventTypes A combination of one or more event type constants encoded as a bitmask.
+     * @param absPath an absolute path.
+     * @param isDeep a <code>boolean</code>.
+     * @param uuid array of UUIDs.
+     * @param nodeTypeName array of node type names.
+     * @return the event journal for this repository.
+     * @throws UnsupportedRepositoryOperationException if this repository does
+     *          not support an event journal (cluster journal disabled).
+     * @throws RepositoryException if another error occurs.
+     */
+    public EventJournal getEventJournal(int eventTypes,
+                                        String absPath,
+                                        boolean isDeep,
+                                        String[] uuid,
+                                        String[] nodeTypeName)
+            throws RepositoryException {
+        Subject subject = ((SessionImpl) getSession()).getSubject();
+        if (subject.getPrincipals(AdminPrincipal.class).isEmpty()) {
+            throw new RepositoryException("Only administrator session may " +
+                    "access EventJournal");
+        }
+        ClusterNode clusterNode = rep.getClusterNode();
+        if (clusterNode == null) {
+            throw new UnsupportedRepositoryOperationException();
+        }
+
+        ObservationManagerImpl obsMgr = (ObservationManagerImpl) session.getWorkspace().getObservationManager();
+        EventFilter filter = obsMgr.createEventFilter(eventTypes, absPath,
+                isDeep, uuid, nodeTypeName, false);
+        return new EventJournalImpl(filter, clusterNode.getJournal(),
+                clusterNode.getId());
+    }
+
+    /**
      * {@inheritDoc}
      */
     public synchronized QueryManager getQueryManager() throws RepositoryException {

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java Fri Nov 14 02:10:15 2008
@@ -230,16 +230,6 @@
     }
 
     /**
-     * Return an iterator over all records after the specified revision.
-     * Subclass responsibility.
-     *
-     * @param startRevision start point (exlusive)
-     * @throws JournalException if an error occurs
-     */
-    protected abstract RecordIterator getRecords(long startRevision)
-            throws JournalException;
-
-    /**
      * Lock the journal revision, disallowing changes from other sources until
      * {@link #unlock has been called, and synchronizes to the latest change.
      *

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java Fri Nov 14 02:10:15 2008
@@ -435,7 +435,7 @@
     /**
      * {@inheritDoc}
      */
-    protected RecordIterator getRecords(long startRevision)
+    public RecordIterator getRecords(long startRevision)
             throws JournalException {
 
         try {
@@ -458,6 +458,28 @@
 
     /**
      * {@inheritDoc}
+     */
+    public RecordIterator getRecords() throws JournalException {
+        try {
+            checkConnection();
+
+            selectRevisionsStmt.clearParameters();
+            selectRevisionsStmt.clearWarnings();
+            selectRevisionsStmt.setLong(1, Long.MIN_VALUE);
+            selectRevisionsStmt.execute();
+
+            return new DatabaseRecordIterator(
+                    selectRevisionsStmt.getResultSet(), getResolver(), getNamePathResolver());
+        } catch (SQLException e) {
+            close(true);
+
+            String msg = "Unable to return record iterator.";
+            throw new JournalException(msg, e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
      * <p/>
      * This journal is locked by incrementing the current value in the table
      * named <code>GLOBAL_REVISION</code>, which effectively write-locks this

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java Fri Nov 14 02:10:15 2008
@@ -154,7 +154,7 @@
     /**
      * {@inheritDoc}
      */
-    protected RecordIterator getRecords(long startRevision)
+    public RecordIterator getRecords(long startRevision)
             throws JournalException {
 
         long stopRevision = getGlobalRevision();
@@ -174,6 +174,32 @@
     /**
      * {@inheritDoc}
      */
+    public RecordIterator getRecords() throws JournalException {
+        long stopRevision = getGlobalRevision();
+        long startRevision = 0;
+
+        RotatingLogFile[] logFiles = RotatingLogFile.listFiles(rootDirectory, basename);
+        File[] files = new File[logFiles.length];
+        for (int i = 0; i < files.length; i++) {
+            files[i] = logFiles[i].getFile();
+            if (i == 0) {
+                try {
+                    FileRecordLog log = new FileRecordLog(files[i]);
+                    startRevision = log.getPreviousRevision();
+                } catch (IOException e) {
+                    String msg = "Unable to read startRevision from first " +
+                            "record log file";
+                    throw new JournalException(msg, e);
+                }
+            }
+        }
+        return new FileRecordIterator(files, startRevision, stopRevision,
+                getResolver(), getNamePathResolver());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     protected void doLock() throws JournalException {
         globalRevision.lock(false);
     }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java Fri Nov 14 02:10:15 2008
@@ -63,6 +63,7 @@
      * Return the record producer for a given identifier.
      *
      * @param identifier identifier
+     * @return the record producer for a given identifier.
      * @throws JournalException if an error occurs
      */
     RecordProducer getProducer(String identifier) throws JournalException;
@@ -79,4 +80,22 @@
      * @throws JournalException on error
      */
     public InstanceRevision getInstanceRevision() throws JournalException;
+
+    /**
+     * Return an iterator over all records after the specified revision.
+     *
+     * @param startRevision start point (exlusive)
+     * @return an iterator over all records after the specified revision.
+     * @throws JournalException if an error occurs
+     */
+    public RecordIterator getRecords(long startRevision)
+            throws JournalException;
+
+    /**
+     * Return an iterator over all available records in the journal.
+     *
+     * @return an iterator over all records.
+     * @throws JournalException if an error occurs
+     */
+    public RecordIterator getRecords() throws JournalException;
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java Fri Nov 14 02:10:15 2008
@@ -238,7 +238,8 @@
             return;
         }
         // check if filtered iterator has at least one event
-        EventIterator it = new FilteredEventIterator(events, filter, denied);
+        EventIterator it = new FilteredEventIterator(events.iterator(),
+                events.getTimestamp(), filter, denied);
         if (it.hasNext()) {
             listener.onEvent(it);
         } else {

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java Fri Nov 14 02:10:15 2008
@@ -32,7 +32,7 @@
  * The <code>EventFilter</code> class implements the filter logic based
  * on the session's access rights and the specified filter rules.
  */
-class EventFilter {
+public class EventFilter {
 
     static final EventFilter BLOCK_ALL = new BlockAllFilter();
 
@@ -131,6 +131,7 @@
     }
 
     /**
+     * TODO: remove this unused method.
      * Returns the <code>ItemManager</code> associated with this
      * <code>EventFilter</code>.
      *

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java?rev=713964&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java Fri Nov 14 02:10:15 2008
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.observation;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Date;
+import java.util.Collections;
+import java.text.DateFormat;
+
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+
+import org.apache.jackrabbit.api.jsr283.observation.EventJournal;
+import org.apache.jackrabbit.core.journal.Journal;
+import org.apache.jackrabbit.core.journal.RecordIterator;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.jackrabbit.core.journal.Record;
+import org.apache.jackrabbit.core.cluster.ClusterRecordDeserializer;
+import org.apache.jackrabbit.core.cluster.ClusterRecord;
+import org.apache.jackrabbit.core.cluster.ClusterRecordProcessor;
+import org.apache.jackrabbit.core.cluster.ChangeLogRecord;
+import org.apache.jackrabbit.core.cluster.LockRecord;
+import org.apache.jackrabbit.core.cluster.NamespaceRecord;
+import org.apache.jackrabbit.core.cluster.NodeTypeRecord;
+import org.apache.jackrabbit.core.cluster.WorkspaceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>EventJournalImpl</code> implements the JCR 2.0 {@link EventJournal}.
+ */
+public class EventJournalImpl implements EventJournal {
+
+    /**
+     * The logger instance for this class.
+     */
+    private static final Logger log = LoggerFactory.getLogger(EventJournalImpl.class);
+
+    /**
+     * The minimum buffer size for events in {@link #eventBundleBuffer}.
+     */
+    private static final int MIN_BUFFER_SIZE = 1024;
+
+    /**
+     * Map of skip maps. Key=Journal, Value=SortedMap
+     * </p>
+     * Each sorted map has the following structure:
+     * Key=Long (timestamp), Value=Long (revision)
+     */
+    private static final Map REVISION_SKIP_MAPS = new WeakHashMap();
+
+    /**
+     * Last revision seen by this event journal.
+     */
+    private Long lastRevision;
+
+    /**
+     * The event filter.
+     */
+    private final EventFilter filter;
+
+    /**
+     * The journal of this repository.
+     */
+    private final Journal journal;
+
+    /**
+     * The producer id to filter journal records.
+     */
+    private final String producerId;
+
+    /**
+     * The name of the workspace to filter journal records.
+     */
+    private final String wspName;
+
+    /**
+     * Buffer of {@link EventBundle}s.
+     */
+    private final List eventBundleBuffer = new LinkedList();
+
+    /**
+     * The current position of this iterator.
+     */
+    private long position;
+
+    /**
+     * Creates a new event journal.
+     *
+     * @param filter for filtering the events read from the journal.
+     * @param journal the cluster journal.
+     * @param producerId the producer id of the cluster node.
+     */
+    public EventJournalImpl(EventFilter filter,
+                            Journal journal,
+                            String producerId) {
+        this.filter = filter;
+        this.journal = journal;
+        this.producerId = producerId;
+        this.wspName = filter.getSession().getWorkspace().getName();
+    }
+
+    //------------------------< EventJournal >---------------------------------
+
+    /**
+     * {@inheritDoc}
+     */
+    public void skipTo(long date) {
+        long time = System.currentTimeMillis();
+
+        // get skip map for this journal
+        SortedMap skipMap = getSkipMap();
+        synchronized (skipMap) {
+            SortedMap head = skipMap.headMap(new Long(time));
+            if (!head.isEmpty()) {
+                eventBundleBuffer.clear();
+                lastRevision = (Long) head.get(head.lastKey());
+            }
+        }
+
+        try {
+            while (hasNext()) {
+                EventBundle bundle = getCurrentBundle();
+                if (bundle.timestamp <= date) {
+                    eventBundleBuffer.remove(0);
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            time = System.currentTimeMillis() - time;
+            log.debug("Skipped event bundles in {} ms.", new Long(time));
+        }
+    }
+
+    //------------------------< EventIterator >---------------------------------
+
+    /**
+     * {@inheritDoc}
+     */
+    public Event nextEvent() {
+        // calling hasNext() will also trigger refill if necessary!
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        EventBundle bundle = getCurrentBundle();
+        // above hasNext() call ensures that there is bundle with an event state
+        assert bundle != null && bundle.events.hasNext();
+
+        Event next = (Event) bundle.events.next();
+        if (!bundle.events.hasNext()) {
+            // done with this bundle -> remove from buffer
+            eventBundleBuffer.remove(0);
+        }
+        position++;
+        return next;
+    }
+
+    //------------------------< RangeIterator >---------------------------------
+
+    /**
+     * {@inheritDoc}
+     */
+    public void skip(long skipNum) {
+        while (skipNum-- > 0) {
+            nextEvent();
+        }
+    }
+
+    /**
+     * @return always -1.
+     */
+    public long getSize() {
+        return -1;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public long getPosition() {
+        // TODO: what happens to position when skipped
+        return position;
+    }
+
+    //--------------------------< Iterator >------------------------------------
+
+    /**
+     * @throws UnsupportedOperationException always.
+     */
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean hasNext() {
+        if (!eventBundleBuffer.isEmpty()) {
+            return true;
+        }
+        // try refill
+        refill();
+        // check again
+        return !eventBundleBuffer.isEmpty();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Object next() {
+        return nextEvent();
+    }
+
+    //----------------------< ClusterRecordProcessor >--------------------------
+
+    /**
+     * Implements {@link ClusterRecordProcessor} and keeps track of the number
+     * of events read and the timestamp of the last record processed.
+     */
+    private class RecordProcessor implements ClusterRecordProcessor {
+
+        /**
+         * Number of events read so far.
+         */
+        private int numEvents;
+
+        /**
+         * The timestamp of the last record processed.
+         */
+        private long lastTimestamp;
+
+        /**
+         * @return the number of events read so far.
+         */
+        private int getNumEvents() {
+            return numEvents;
+        }
+
+        /**
+         * @return the timestamp of the last record processed.
+         */
+        private long getLastTimestamp() {
+            return lastTimestamp;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        public void process(ChangeLogRecord record) {
+            List events = record.getEvents();
+            if (!events.isEmpty()) {
+                EventBundle bundle = new EventBundle(events,
+                        record.getTimestamp(), filter);
+                if (bundle.events.hasNext()) {
+                    // only queue bundle if there is an event
+                    eventBundleBuffer.add(bundle);
+                    numEvents += events.size();
+                    lastTimestamp = record.getTimestamp();
+                }
+            }
+        }
+
+        public void process(LockRecord record) {
+            // ignore
+        }
+
+        public void process(NamespaceRecord record) {
+            // ignore
+        }
+
+        public void process(NodeTypeRecord record) {
+            // ignore
+        }
+
+        public void process(WorkspaceRecord record) {
+            // ignore
+        }
+    }
+
+    //-------------------------------< internal >-------------------------------
+
+    /**
+     * @return the current event bundle or <code>null</code> if there is none.
+     */
+    private EventBundle getCurrentBundle() {
+        while (!eventBundleBuffer.isEmpty()) {
+            EventBundle bundle = (EventBundle) eventBundleBuffer.get(0);
+            if (bundle.events.hasNext()) {
+                return bundle;
+            } else {
+                eventBundleBuffer.remove(0);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Refills the {@link #eventBundleBuffer}.
+     */
+    private void refill() {
+        assert eventBundleBuffer.isEmpty();
+        try {
+            RecordProcessor processor = new RecordProcessor();
+            ClusterRecordDeserializer deserializer = new ClusterRecordDeserializer();
+            RecordIterator records;
+            if (lastRevision != null) {
+                log.debug("refilling event bundle buffer starting at revision {}",
+                        lastRevision);
+                records = journal.getRecords(lastRevision.longValue());
+            } else {
+                log.debug("refilling event bundle buffer starting at journal beginning");
+                records = journal.getRecords();
+            }
+            try {
+                while (processor.getNumEvents() < MIN_BUFFER_SIZE && records.hasNext()) {
+                    Record record = records.nextRecord();
+                    if (record.getProducerId().equals(producerId)) {
+                        ClusterRecord cr = deserializer.deserialize(record);
+                        if (!wspName.equals(cr.getWorkspace())) {
+                            continue;
+                        }
+                        cr.process(processor);
+                        lastRevision = new Long(cr.getRevision());
+                    }
+                }
+
+                if (processor.getNumEvents() >= MIN_BUFFER_SIZE) {
+                    // remember in skip map
+                    SortedMap skipMap = getSkipMap();
+                    Long timestamp = new Long(processor.getLastTimestamp());
+                    synchronized (skipMap) {
+                        if (log.isDebugEnabled()) {
+                            DateFormat df = DateFormat.getDateTimeInstance();
+                            log.debug("remember record in skip map: {} -> {}",
+                                    df.format(new Date(timestamp.longValue())),
+                                            lastRevision);
+                        }
+                        skipMap.put(timestamp, lastRevision);
+                    }
+                }
+            } finally {
+                records.close();
+            }
+        } catch (JournalException e) {
+            log.warn("Unable to read journal records", e);
+        }
+    }
+
+    /**
+     * @return the revision skip map for this journal.
+     */
+    private SortedMap getSkipMap() {
+        synchronized (REVISION_SKIP_MAPS) {
+            SortedMap map = (SortedMap) REVISION_SKIP_MAPS.get(journal);
+            if (map == null) {
+                map = new TreeMap();
+                REVISION_SKIP_MAPS.put(journal, map);
+            }
+            return map;
+        }
+    }
+
+    /**
+     * Simple class to associate an {@link EventState} iterator with a timestamp.
+     */
+    private static final class EventBundle {
+
+        /**
+         * An iterator of {@link Event}s.
+         */
+        final EventIterator events;
+
+        /**
+         * Timestamp when the events were created.
+         */
+        final long timestamp;
+
+        /**
+         * Creates a new event bundle.
+         *
+         * @param eventStates the {@link EventState}s that belong to this bundle.
+         * @param timestamp the timestamp when the events were created.
+         * @param filter the event filter.
+         */
+        private EventBundle(List eventStates,
+                            long timestamp,
+                            EventFilter filter) {
+            this.events = new FilteredEventIterator(eventStates.iterator(),
+                    timestamp, filter, Collections.EMPTY_SET);
+            this.timestamp = timestamp;
+        }
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java Fri Nov 14 02:10:15 2008
@@ -70,20 +70,22 @@
     /**
      * Creates a new <code>FilteredEventIterator</code>.
      *
-     * @param c      an unmodifiable Collection of {@link javax.jcr.observation.Event}s.
+     * @param eventStates an iterator over unfiltered {@link EventState}s.
+     * @param timestamp the time when the event were created.
      * @param filter only event that pass the filter will be dispatched to the
      *               event listener.
      * @param denied <code>Set</code> of <code>ItemId</code>s of denied <code>ItemState</code>s
      *               rejected by the <code>AccessManager</code>. If
      *               <code>null</code> no <code>ItemState</code> is denied.
      */
-    public FilteredEventIterator(EventStateCollection c,
+    public FilteredEventIterator(Iterator eventStates,
+                                 long timestamp,
                                  EventFilter filter,
                                  Set denied) {
-        actualEvents = c.iterator();
+        this.actualEvents = eventStates;
         this.filter = filter;
         this.denied = denied;
-        this.timestamp = c.getTimestamp();
+        this.timestamp = timestamp;
         fetchNext();
     }
 

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java Fri Nov 14 02:10:15 2008
@@ -102,42 +102,9 @@
                                  boolean noLocal)
             throws RepositoryException {
 
-        // create NodeType instances from names
-        NodeTypeImpl[] nodeTypes;
-        if (nodeTypeName == null) {
-            nodeTypes = null;
-        } else {
-            NodeTypeManagerImpl ntMgr = session.getNodeTypeManager();
-            nodeTypes = new NodeTypeImpl[nodeTypeName.length];
-            for (int i = 0; i < nodeTypes.length; i++) {
-                nodeTypes[i] = (NodeTypeImpl) ntMgr.getNodeType(nodeTypeName[i]);
-            }
-        }
-
-        Path path;
-        try {
-            path = session.getQPath(absPath).getNormalizedPath();
-        } catch (NameException e) {
-            String msg = "invalid path syntax: " + absPath;
-            log.debug(msg);
-            throw new RepositoryException(msg, e);
-        }
-        NodeId[] ids = null;
-        if (uuid != null) {
-            ids = new NodeId[uuid.length];
-            for (int i = 0; i < uuid.length; i++) {
-                ids[i] = NodeId.valueOf(uuid[i]);
-            }
-        }
         // create filter
-        EventFilter filter = new EventFilter(itemMgr,
-                session,
-                eventTypes,
-                path,
-                isDeep,
-                ids,
-                nodeTypes,
-                noLocal);
+        EventFilter filter = createEventFilter(eventTypes, absPath,
+                isDeep, uuid, nodeTypeName, noLocal);
 
         dispatcher.addConsumer(new EventConsumer(session, listener, filter));
     }
@@ -180,6 +147,60 @@
 
     }
 
+    /**
+     * Creates a new event filter with the given restrictions.
+     *
+     * @param eventTypes A combination of one or more event type constants encoded as a bitmask.
+     * @param absPath an absolute path.
+     * @param isDeep a <code>boolean</code>.
+     * @param uuid array of UUIDs.
+     * @param nodeTypeName array of node type names.
+     * @param noLocal a <code>boolean</code>.
+     * @return the event filter with the given restrictions.
+     * @throws RepositoryException if an error occurs.
+     */
+    public EventFilter createEventFilter(int eventTypes,
+                                         String absPath,
+                                         boolean isDeep,
+                                         String[] uuid,
+                                         String[] nodeTypeName,
+                                         boolean noLocal)
+            throws RepositoryException {
+        // create NodeType instances from names
+        NodeTypeImpl[] nodeTypes;
+        if (nodeTypeName == null) {
+            nodeTypes = null;
+        } else {
+            NodeTypeManagerImpl ntMgr = session.getNodeTypeManager();
+            nodeTypes = new NodeTypeImpl[nodeTypeName.length];
+            for (int i = 0; i < nodeTypes.length; i++) {
+                nodeTypes[i] = (NodeTypeImpl) ntMgr.getNodeType(nodeTypeName[i]);
+            }
+        }
+
+        Path path;
+        try {
+            path = session.getQPath(absPath).getNormalizedPath();
+        } catch (NameException e) {
+            String msg = "invalid path syntax: " + absPath;
+            log.debug(msg);
+            throw new RepositoryException(msg, e);
+        }
+        if (!path.isAbsolute()) {
+            throw new RepositoryException("absPath must be absolute");
+        }
+        NodeId[] ids = null;
+        if (uuid != null) {
+            ids = new NodeId[uuid.length];
+            for (int i = 0; i < uuid.length; i++) {
+                ids[i] = NodeId.valueOf(uuid[i]);
+            }
+        }
+        // create filter
+        return new EventFilter(itemMgr, session, eventTypes, path,
+                isDeep, ids, nodeTypes, noLocal);
+    }
+
     //------------------------------------------< EventStateCollectionFactory >
 
     /**

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java Fri Nov 14 02:10:15 2008
@@ -23,11 +23,6 @@
 import java.util.ArrayList;
 import java.util.NoSuchElementException;
 
-import org.apache.jackrabbit.core.journal.AbstractJournal;
-import org.apache.jackrabbit.core.journal.AppendRecord;
-import org.apache.jackrabbit.core.journal.InstanceRevision;
-import org.apache.jackrabbit.core.journal.JournalException;
-import org.apache.jackrabbit.core.journal.RecordIterator;
 import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,7 +144,7 @@
     /**
      * {@inheritDoc}
      */
-    protected RecordIterator getRecords(long startRevision)
+    public RecordIterator getRecords(long startRevision)
             throws JournalException {
 
         checkState();
@@ -161,6 +156,13 @@
     }
 
     /**
+     * {@inheritDoc}
+     */
+    public RecordIterator getRecords() throws JournalException {
+        return new MemoryRecordIterator(0, records.size());
+    }
+
+    /**
      * Set records. Used to share records between two journal implementations.
      *
      * @param records array list that should back up this memory journal

Added: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java?rev=713964&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java Fri Nov 14 02:10:15 2008
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.observation;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.Node;
+import javax.jcr.Session;
+import javax.jcr.observation.Event;
+
+import org.apache.jackrabbit.test.AbstractJCRTest;
+import org.apache.jackrabbit.core.WorkspaceImpl;
+import org.apache.jackrabbit.api.jsr283.observation.EventJournal;
+
+/**
+ * <code>EventJournalTest</code> performs EventJournal tests.
+ */
+public class EventJournalTest extends AbstractJCRTest {
+
+    private static final int ALL_TYPES = Event.NODE_ADDED | Event.NODE_REMOVED | Event.PROPERTY_ADDED | Event.PROPERTY_CHANGED | Event.PROPERTY_REMOVED;
+
+    private EventJournal journal;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        journal = getEventJournal(ALL_TYPES, "/", true, null, null);
+    }
+
+    public void testSkipToNow() throws RepositoryException {
+        // skip everything
+        journal.skipTo(System.currentTimeMillis());
+        assertFalse(journal.hasNext());
+    }
+
+    public void testSkipTo() throws Exception {
+        long time = System.currentTimeMillis();
+
+        // add some nodes
+        Node n1 = testRootNode.addNode(nodeName1);
+        Node n2 = testRootNode.addNode(nodeName2);
+
+        // make sure some time passed otherwise we might
+        // skip this change as well.
+        while (time == System.currentTimeMillis()) {
+            Thread.sleep(1);
+        }
+
+        // now save
+        superuser.save();
+
+        journal.skipTo(time);
+        // at least the two added nodes must be returned by the journal
+        checkJournal(new String[]{n1.getPath(), n2.getPath()}, new String[0]);
+    }
+
+    public void testLiveJournal() throws RepositoryException {
+        journal.skipTo(System.currentTimeMillis());
+        assertFalse(journal.hasNext());
+
+        testRootNode.addNode(nodeName1);
+        superuser.save();
+
+        assertTrue(journal.hasNext());
+    }
+
+    public void testWorkspaceSeparation() throws RepositoryException {
+        journal.skipTo(System.currentTimeMillis());
+        assertFalse(journal.hasNext());
+
+        Session session = helper.getSuperuserSession(workspaceName);
+        try {
+            Node rootNode = session.getRootNode();
+            if (rootNode.hasNode(nodeName1)) {
+                rootNode.getNode(nodeName1).remove();
+            } else {
+                rootNode.addNode(nodeName1);
+            }
+            session.save();
+        } finally {
+            session.logout();
+        }
+
+        assertFalse(journal.hasNext());
+    }
+
+    public void testEventType() throws RepositoryException {
+        Node n1 = testRootNode.addNode(nodeName1);
+
+        journal = getEventJournal(Event.PROPERTY_ADDED, testRoot, true, null, null);
+        journal.skipTo(System.currentTimeMillis());
+
+        superuser.save();
+
+        checkJournal(new String[]{n1.getPath() + "/" + jcrPrimaryType},
+                new String[]{n1.getPath()});
+    }
+
+    public void testPath() throws RepositoryException {
+        Node n1 = testRootNode.addNode(nodeName1);
+        Node n2 = n1.addNode(nodeName2);
+
+        journal = getEventJournal(ALL_TYPES, n1.getPath(), true, null, null);
+        journal.skipTo(System.currentTimeMillis());
+
+        superuser.save();
+
+        checkJournal(new String[]{n2.getPath()}, new String[]{n1.getPath()});
+    }
+
+    public void testIsDeepTrue() throws RepositoryException {
+        Node n1 = testRootNode.addNode(nodeName1);
+        Node n2 = n1.addNode(nodeName2);
+
+        journal = getEventJournal(ALL_TYPES, testRoot, true, null, null);
+        journal.skipTo(System.currentTimeMillis());
+
+        superuser.save();
+
+        checkJournal(new String[]{n1.getPath(), n2.getPath()}, new String[0]);
+    }
+
+    public void testIsDeepFalse() throws RepositoryException {
+        Node n1 = testRootNode.addNode(nodeName1);
+        Node n2 = n1.addNode(nodeName2);
+
+        journal = getEventJournal(ALL_TYPES, testRoot, false, null, null);
+        journal.skipTo(System.currentTimeMillis());
+
+        superuser.save();
+
+        checkJournal(new String[]{n1.getPath()}, new String[]{n2.getPath()});
+    }
+
+    public void testUUID() throws RepositoryException {
+        Node n1 = testRootNode.addNode(nodeName1);
+        if (!n1.isNodeType(mixReferenceable)) {
+            n1.addMixin(mixReferenceable);
+        }
+        superuser.save();
+
+        Node n2 = n1.addNode(nodeName2);
+
+        journal = getEventJournal(ALL_TYPES, "/", true, new String[]{n1.getUUID()}, null);
+        journal.skipTo(System.currentTimeMillis());
+
+        superuser.save();
+
+        checkJournal(new String[]{n2.getPath()}, new String[0]);
+    }
+
+    public void testNodeType() throws RepositoryException {
+        Node n1 = testRootNode.addNode(nodeName1, "nt:folder");
+        Node n2 = n1.addNode(nodeName2, "nt:folder");
+
+        journal = getEventJournal(ALL_TYPES, testRoot, true, null,
+                new String[]{"nt:folder"});
+        journal.skipTo(System.currentTimeMillis());
+
+        superuser.save();
+
+        checkJournal(new String[]{n2.getPath()}, new String[]{n1.getPath()});
+    }
+
+    //-------------------------------< internal >-------------------------------
+
+    private EventJournal getEventJournal(int eventTypes,
+                                        String absPath,
+                                        boolean isDeep,
+                                        String[] uuid,
+                                        String[] nodeTypeName)
+            throws RepositoryException {
+        return ((WorkspaceImpl) superuser.getWorkspace()).getEventJournal(
+                eventTypes, absPath, isDeep, uuid, nodeTypeName);
+    }
+
+    /**
+     * Checks the journal for events.
+     *
+     * @param allowed allowed paths for the returned events.
+     * @param denied denied paths for the returned events.
+     * @throws RepositoryException if an error occurs while reading the event
+     *          journal.
+     */
+    private void checkJournal(String[] allowed, String[] denied) throws RepositoryException {
+        Set allowedSet = new HashSet(Arrays.asList(allowed));
+        Set deniedSet = new HashSet(Arrays.asList(denied));
+        while (journal.hasNext()) {
+            String path = journal.nextEvent().getPath();
+            allowedSet.remove(path);
+            if (deniedSet.contains(path)) {
+                fail(path + " must not be present in journal");
+            }
+        }
+        assertTrue("Missing paths in journal: " + allowedSet, allowedSet.isEmpty());
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java Fri Nov 14 02:10:15 2008
@@ -40,6 +40,7 @@
         suite.addTestSuite(MixinTest.class);
         suite.addTestSuite(VersionEventsTest.class);
         suite.addTestSuite(MoveInPlaceTest.class);
+        suite.addTestSuite(EventJournalTest.class);
 
         return suite;
     }

Modified: jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml Fri Nov 14 02:10:15 2008
@@ -123,4 +123,11 @@
     <SearchIndex class="org.apache.jackrabbit.core.query.lucene.SearchIndex">
         <param name="path" value="${rep.home}/repository/index"/>
     </SearchIndex>
+    
+    <!--
+        Run with a cluster journal
+    -->
+    <Cluster id="node1">
+        <Journal class="org.apache.jackrabbit.core.journal.MemoryJournal"/>
+    </Cluster>
 </Repository>