You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2008/11/14 11:10:19 UTC
svn commit: r713964 - in /jackrabbit/trunk/jackrabbit-core/src:
main/java/org/apache/jackrabbit/core/
main/java/org/apache/jackrabbit/core/journal/
main/java/org/apache/jackrabbit/core/observation/
test/java/org/apache/jackrabbit/core/journal/ test/jav...
Author: mreutegg
Date: Fri Nov 14 02:10:15 2008
New Revision: 713964
URL: http://svn.apache.org/viewvc?rev=713964&view=rev
Log:
JCR-1849: EventJournal
Added:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java (with props)
jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java (with props)
Modified:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java
jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java
jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java
jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java Fri Nov 14 02:10:15 2008
@@ -17,11 +17,14 @@
package org.apache.jackrabbit.core;
import org.apache.jackrabbit.api.JackrabbitWorkspace;
+import org.apache.jackrabbit.api.jsr283.observation.EventJournal;
import org.apache.jackrabbit.core.config.WorkspaceConfig;
import org.apache.jackrabbit.core.lock.LockManager;
import org.apache.jackrabbit.core.observation.EventStateCollection;
import org.apache.jackrabbit.core.observation.EventStateCollectionFactory;
import org.apache.jackrabbit.core.observation.ObservationManagerImpl;
+import org.apache.jackrabbit.core.observation.EventJournalImpl;
+import org.apache.jackrabbit.core.observation.EventFilter;
import org.apache.jackrabbit.core.query.QueryManagerImpl;
import org.apache.jackrabbit.core.state.LocalItemStateManager;
import org.apache.jackrabbit.core.state.SharedItemStateManager;
@@ -31,6 +34,8 @@
import org.apache.jackrabbit.core.xml.ImportHandler;
import org.apache.jackrabbit.core.xml.Importer;
import org.apache.jackrabbit.core.xml.WorkspaceImporter;
+import org.apache.jackrabbit.core.cluster.ClusterNode;
+import org.apache.jackrabbit.core.security.principal.AdminPrincipal;
import org.apache.jackrabbit.commons.AbstractWorkspace;
import org.apache.jackrabbit.spi.commons.conversion.NameException;
import org.apache.jackrabbit.spi.Path;
@@ -57,6 +62,7 @@
import javax.jcr.version.Version;
import javax.jcr.version.VersionException;
import javax.jcr.version.VersionHistory;
+import javax.security.auth.Subject;
import java.util.HashMap;
import java.util.Iterator;
@@ -732,6 +738,43 @@
}
/**
+ * Returns the event journal for this workspace. The events are filtered
+ * according to the passed criteria.
+ *
+ * @param eventTypes A combination of one or more event type constants encoded as a bitmask.
+ * @param absPath an absolute path.
+ * @param isDeep a <code>boolean</code>.
+ * @param uuid array of UUIDs.
+ * @param nodeTypeName array of node type names.
+ * @return the event journal for this repository.
+ * @throws UnsupportedRepositoryOperationException if this repository does
+ * not support an event journal (cluster journal disabled).
+ * @throws RepositoryException if another error occurs.
+ */
+ public EventJournal getEventJournal(int eventTypes,
+ String absPath,
+ boolean isDeep,
+ String[] uuid,
+ String[] nodeTypeName)
+ throws RepositoryException {
+ Subject subject = ((SessionImpl) getSession()).getSubject();
+ if (subject.getPrincipals(AdminPrincipal.class).isEmpty()) {
+ throw new RepositoryException("Only administrator session may " +
+ "access EventJournal");
+ }
+ ClusterNode clusterNode = rep.getClusterNode();
+ if (clusterNode == null) {
+ throw new UnsupportedRepositoryOperationException();
+ }
+
+ ObservationManagerImpl obsMgr = (ObservationManagerImpl) session.getWorkspace().getObservationManager();
+ EventFilter filter = obsMgr.createEventFilter(eventTypes, absPath,
+ isDeep, uuid, nodeTypeName, false);
+ return new EventJournalImpl(filter, clusterNode.getJournal(),
+ clusterNode.getId());
+ }
+
+ /**
* {@inheritDoc}
*/
public synchronized QueryManager getQueryManager() throws RepositoryException {
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java Fri Nov 14 02:10:15 2008
@@ -230,16 +230,6 @@
}
/**
- * Return an iterator over all records after the specified revision.
- * Subclass responsibility.
- *
- * @param startRevision start point (exlusive)
- * @throws JournalException if an error occurs
- */
- protected abstract RecordIterator getRecords(long startRevision)
- throws JournalException;
-
- /**
* Lock the journal revision, disallowing changes from other sources until
* {@link #unlock has been called, and synchronizes to the latest change.
*
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java Fri Nov 14 02:10:15 2008
@@ -435,7 +435,7 @@
/**
* {@inheritDoc}
*/
- protected RecordIterator getRecords(long startRevision)
+ public RecordIterator getRecords(long startRevision)
throws JournalException {
try {
@@ -458,6 +458,28 @@
/**
* {@inheritDoc}
+ */
+ public RecordIterator getRecords() throws JournalException {
+ try {
+ checkConnection();
+
+ selectRevisionsStmt.clearParameters();
+ selectRevisionsStmt.clearWarnings();
+ selectRevisionsStmt.setLong(1, Long.MIN_VALUE);
+ selectRevisionsStmt.execute();
+
+ return new DatabaseRecordIterator(
+ selectRevisionsStmt.getResultSet(), getResolver(), getNamePathResolver());
+ } catch (SQLException e) {
+ close(true);
+
+ String msg = "Unable to return record iterator.";
+ throw new JournalException(msg, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
* <p/>
* This journal is locked by incrementing the current value in the table
* named <code>GLOBAL_REVISION</code>, which effectively write-locks this
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java Fri Nov 14 02:10:15 2008
@@ -154,7 +154,7 @@
/**
* {@inheritDoc}
*/
- protected RecordIterator getRecords(long startRevision)
+ public RecordIterator getRecords(long startRevision)
throws JournalException {
long stopRevision = getGlobalRevision();
@@ -174,6 +174,32 @@
/**
* {@inheritDoc}
*/
+ public RecordIterator getRecords() throws JournalException {
+ long stopRevision = getGlobalRevision();
+ long startRevision = 0;
+
+ RotatingLogFile[] logFiles = RotatingLogFile.listFiles(rootDirectory, basename);
+ File[] files = new File[logFiles.length];
+ for (int i = 0; i < files.length; i++) {
+ files[i] = logFiles[i].getFile();
+ if (i == 0) {
+ try {
+ FileRecordLog log = new FileRecordLog(files[i]);
+ startRevision = log.getPreviousRevision();
+ } catch (IOException e) {
+ String msg = "Unable to read startRevision from first " +
+ "record log file";
+ throw new JournalException(msg, e);
+ }
+ }
+ }
+ return new FileRecordIterator(files, startRevision, stopRevision,
+ getResolver(), getNamePathResolver());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
protected void doLock() throws JournalException {
globalRevision.lock(false);
}
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java Fri Nov 14 02:10:15 2008
@@ -63,6 +63,7 @@
* Return the record producer for a given identifier.
*
* @param identifier identifier
+ * @return the record producer for a given identifier.
* @throws JournalException if an error occurs
*/
RecordProducer getProducer(String identifier) throws JournalException;
@@ -79,4 +80,22 @@
* @throws JournalException on error
*/
public InstanceRevision getInstanceRevision() throws JournalException;
+
+ /**
+ * Return an iterator over all records after the specified revision.
+ *
+ * @param startRevision start point (exlusive)
+ * @return an iterator over all records after the specified revision.
+ * @throws JournalException if an error occurs
+ */
+ public RecordIterator getRecords(long startRevision)
+ throws JournalException;
+
+ /**
+ * Return an iterator over all available records in the journal.
+ *
+ * @return an iterator over all records.
+ * @throws JournalException if an error occurs
+ */
+ public RecordIterator getRecords() throws JournalException;
}
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java Fri Nov 14 02:10:15 2008
@@ -238,7 +238,8 @@
return;
}
// check if filtered iterator has at least one event
- EventIterator it = new FilteredEventIterator(events, filter, denied);
+ EventIterator it = new FilteredEventIterator(events.iterator(),
+ events.getTimestamp(), filter, denied);
if (it.hasNext()) {
listener.onEvent(it);
} else {
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java Fri Nov 14 02:10:15 2008
@@ -32,7 +32,7 @@
* The <code>EventFilter</code> class implements the filter logic based
* on the session's access rights and the specified filter rules.
*/
-class EventFilter {
+public class EventFilter {
static final EventFilter BLOCK_ALL = new BlockAllFilter();
@@ -131,6 +131,7 @@
}
/**
+ * TODO: remove this unused method.
* Returns the <code>ItemManager</code> associated with this
* <code>EventFilter</code>.
*
Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java?rev=713964&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java Fri Nov 14 02:10:15 2008
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.observation;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Date;
+import java.util.Collections;
+import java.text.DateFormat;
+
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+
+import org.apache.jackrabbit.api.jsr283.observation.EventJournal;
+import org.apache.jackrabbit.core.journal.Journal;
+import org.apache.jackrabbit.core.journal.RecordIterator;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.jackrabbit.core.journal.Record;
+import org.apache.jackrabbit.core.cluster.ClusterRecordDeserializer;
+import org.apache.jackrabbit.core.cluster.ClusterRecord;
+import org.apache.jackrabbit.core.cluster.ClusterRecordProcessor;
+import org.apache.jackrabbit.core.cluster.ChangeLogRecord;
+import org.apache.jackrabbit.core.cluster.LockRecord;
+import org.apache.jackrabbit.core.cluster.NamespaceRecord;
+import org.apache.jackrabbit.core.cluster.NodeTypeRecord;
+import org.apache.jackrabbit.core.cluster.WorkspaceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>EventJournalImpl</code> implements the JCR 2.0 {@link EventJournal}.
+ */
+public class EventJournalImpl implements EventJournal {
+
+ /**
+ * The logger instance for this class.
+ */
+ private static final Logger log = LoggerFactory.getLogger(EventJournalImpl.class);
+
+ /**
+ * The minimum buffer size for events in {@link #eventBundleBuffer}.
+ */
+ private static final int MIN_BUFFER_SIZE = 1024;
+
+ /**
+ * Map of skip maps. Key=Journal, Value=SortedMap
+ * </p>
+ * Each sorted map has the following structure:
+ * Key=Long (timestamp), Value=Long (revision)
+ */
+ private static final Map REVISION_SKIP_MAPS = new WeakHashMap();
+
+ /**
+ * Last revision seen by this event journal.
+ */
+ private Long lastRevision;
+
+ /**
+ * The event filter.
+ */
+ private final EventFilter filter;
+
+ /**
+ * The journal of this repository.
+ */
+ private final Journal journal;
+
+ /**
+ * The producer id to filter journal records.
+ */
+ private final String producerId;
+
+ /**
+ * The name of the workspace to filter journal records.
+ */
+ private final String wspName;
+
+ /**
+ * Buffer of {@link EventBundle}s.
+ */
+ private final List eventBundleBuffer = new LinkedList();
+
+ /**
+ * The current position of this iterator.
+ */
+ private long position;
+
+ /**
+ * Creates a new event journal.
+ *
+ * @param filter for filtering the events read from the journal.
+ * @param journal the cluster journal.
+ * @param producerId the producer id of the cluster node.
+ */
+ public EventJournalImpl(EventFilter filter,
+ Journal journal,
+ String producerId) {
+ this.filter = filter;
+ this.journal = journal;
+ this.producerId = producerId;
+ this.wspName = filter.getSession().getWorkspace().getName();
+ }
+
+ //------------------------< EventJournal >---------------------------------
+
+ /**
+ * {@inheritDoc}
+ */
+ public void skipTo(long date) {
+ long time = System.currentTimeMillis();
+
+ // get skip map for this journal
+ SortedMap skipMap = getSkipMap();
+ synchronized (skipMap) {
+ SortedMap head = skipMap.headMap(new Long(time));
+ if (!head.isEmpty()) {
+ eventBundleBuffer.clear();
+ lastRevision = (Long) head.get(head.lastKey());
+ }
+ }
+
+ try {
+ while (hasNext()) {
+ EventBundle bundle = getCurrentBundle();
+ if (bundle.timestamp <= date) {
+ eventBundleBuffer.remove(0);
+ } else {
+ break;
+ }
+ }
+ } finally {
+ time = System.currentTimeMillis() - time;
+ log.debug("Skipped event bundles in {} ms.", new Long(time));
+ }
+ }
+
+ //------------------------< EventIterator >---------------------------------
+
+ /**
+ * {@inheritDoc}
+ */
+ public Event nextEvent() {
+ // calling hasNext() will also trigger refill if necessary!
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ EventBundle bundle = getCurrentBundle();
+ // above hasNext() call ensures that there is bundle with an event state
+ assert bundle != null && bundle.events.hasNext();
+
+ Event next = (Event) bundle.events.next();
+ if (!bundle.events.hasNext()) {
+ // done with this bundle -> remove from buffer
+ eventBundleBuffer.remove(0);
+ }
+ position++;
+ return next;
+ }
+
+ //------------------------< RangeIterator >---------------------------------
+
+ /**
+ * {@inheritDoc}
+ */
+ public void skip(long skipNum) {
+ while (skipNum-- > 0) {
+ nextEvent();
+ }
+ }
+
+ /**
+ * @return always -1.
+ */
+ public long getSize() {
+ return -1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public long getPosition() {
+ // TODO: what happens to position when skipped
+ return position;
+ }
+
+ //--------------------------< Iterator >------------------------------------
+
+ /**
+ * @throws UnsupportedOperationException always.
+ */
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean hasNext() {
+ if (!eventBundleBuffer.isEmpty()) {
+ return true;
+ }
+ // try refill
+ refill();
+ // check again
+ return !eventBundleBuffer.isEmpty();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Object next() {
+ return nextEvent();
+ }
+
+ //----------------------< ClusterRecordProcessor >--------------------------
+
+ /**
+ * Implements {@link ClusterRecordProcessor} and keeps track of the number
+ * of events read and the timestamp of the last record processed.
+ */
+ private class RecordProcessor implements ClusterRecordProcessor {
+
+ /**
+ * Number of events read so far.
+ */
+ private int numEvents;
+
+ /**
+ * The timestamp of the last record processed.
+ */
+ private long lastTimestamp;
+
+ /**
+ * @return the number of events read so far.
+ */
+ private int getNumEvents() {
+ return numEvents;
+ }
+
+ /**
+ * @return the timestamp of the last record processed.
+ */
+ private long getLastTimestamp() {
+ return lastTimestamp;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void process(ChangeLogRecord record) {
+ List events = record.getEvents();
+ if (!events.isEmpty()) {
+ EventBundle bundle = new EventBundle(events,
+ record.getTimestamp(), filter);
+ if (bundle.events.hasNext()) {
+ // only queue bundle if there is an event
+ eventBundleBuffer.add(bundle);
+ numEvents += events.size();
+ lastTimestamp = record.getTimestamp();
+ }
+ }
+ }
+
+ public void process(LockRecord record) {
+ // ignore
+ }
+
+ public void process(NamespaceRecord record) {
+ // ignore
+ }
+
+ public void process(NodeTypeRecord record) {
+ // ignore
+ }
+
+ public void process(WorkspaceRecord record) {
+ // ignore
+ }
+ }
+
+ //-------------------------------< internal >-------------------------------
+
+ /**
+ * @return the current event bundle or <code>null</code> if there is none.
+ */
+ private EventBundle getCurrentBundle() {
+ while (!eventBundleBuffer.isEmpty()) {
+ EventBundle bundle = (EventBundle) eventBundleBuffer.get(0);
+ if (bundle.events.hasNext()) {
+ return bundle;
+ } else {
+ eventBundleBuffer.remove(0);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Refills the {@link #eventBundleBuffer}.
+ */
+ private void refill() {
+ assert eventBundleBuffer.isEmpty();
+ try {
+ RecordProcessor processor = new RecordProcessor();
+ ClusterRecordDeserializer deserializer = new ClusterRecordDeserializer();
+ RecordIterator records;
+ if (lastRevision != null) {
+ log.debug("refilling event bundle buffer starting at revision {}",
+ lastRevision);
+ records = journal.getRecords(lastRevision.longValue());
+ } else {
+ log.debug("refilling event bundle buffer starting at journal beginning");
+ records = journal.getRecords();
+ }
+ try {
+ while (processor.getNumEvents() < MIN_BUFFER_SIZE && records.hasNext()) {
+ Record record = records.nextRecord();
+ if (record.getProducerId().equals(producerId)) {
+ ClusterRecord cr = deserializer.deserialize(record);
+ if (!wspName.equals(cr.getWorkspace())) {
+ continue;
+ }
+ cr.process(processor);
+ lastRevision = new Long(cr.getRevision());
+ }
+ }
+
+ if (processor.getNumEvents() >= MIN_BUFFER_SIZE) {
+ // remember in skip map
+ SortedMap skipMap = getSkipMap();
+ Long timestamp = new Long(processor.getLastTimestamp());
+ synchronized (skipMap) {
+ if (log.isDebugEnabled()) {
+ DateFormat df = DateFormat.getDateTimeInstance();
+ log.debug("remember record in skip map: {} -> {}",
+ df.format(new Date(timestamp.longValue())),
+ lastRevision);
+ }
+ skipMap.put(timestamp, lastRevision);
+ }
+ }
+ } finally {
+ records.close();
+ }
+ } catch (JournalException e) {
+ log.warn("Unable to read journal records", e);
+ }
+ }
+
+ /**
+ * @return the revision skip map for this journal.
+ */
+ private SortedMap getSkipMap() {
+ synchronized (REVISION_SKIP_MAPS) {
+ SortedMap map = (SortedMap) REVISION_SKIP_MAPS.get(journal);
+ if (map == null) {
+ map = new TreeMap();
+ REVISION_SKIP_MAPS.put(journal, map);
+ }
+ return map;
+ }
+ }
+
+ /**
+ * Simple class to associate an {@link EventState} iterator with a timestamp.
+ */
+ private static final class EventBundle {
+
+ /**
+ * An iterator of {@link Event}s.
+ */
+ final EventIterator events;
+
+ /**
+ * Timestamp when the events were created.
+ */
+ final long timestamp;
+
+ /**
+ * Creates a new event bundle.
+ *
+ * @param eventStates the {@link EventState}s that belong to this bundle.
+ * @param timestamp the timestamp when the events were created.
+ * @param filter the event filter.
+ */
+ private EventBundle(List eventStates,
+ long timestamp,
+ EventFilter filter) {
+ this.events = new FilteredEventIterator(eventStates.iterator(),
+ timestamp, filter, Collections.EMPTY_SET);
+ this.timestamp = timestamp;
+ }
+ }
+}
Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java Fri Nov 14 02:10:15 2008
@@ -70,20 +70,22 @@
/**
* Creates a new <code>FilteredEventIterator</code>.
*
- * @param c an unmodifiable Collection of {@link javax.jcr.observation.Event}s.
+ * @param eventStates an iterator over unfiltered {@link EventState}s.
+ * @param timestamp the time when the event were created.
* @param filter only event that pass the filter will be dispatched to the
* event listener.
* @param denied <code>Set</code> of <code>ItemId</code>s of denied <code>ItemState</code>s
* rejected by the <code>AccessManager</code>. If
* <code>null</code> no <code>ItemState</code> is denied.
*/
- public FilteredEventIterator(EventStateCollection c,
+ public FilteredEventIterator(Iterator eventStates,
+ long timestamp,
EventFilter filter,
Set denied) {
- actualEvents = c.iterator();
+ this.actualEvents = eventStates;
this.filter = filter;
this.denied = denied;
- this.timestamp = c.getTimestamp();
+ this.timestamp = timestamp;
fetchNext();
}
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java Fri Nov 14 02:10:15 2008
@@ -102,42 +102,9 @@
boolean noLocal)
throws RepositoryException {
- // create NodeType instances from names
- NodeTypeImpl[] nodeTypes;
- if (nodeTypeName == null) {
- nodeTypes = null;
- } else {
- NodeTypeManagerImpl ntMgr = session.getNodeTypeManager();
- nodeTypes = new NodeTypeImpl[nodeTypeName.length];
- for (int i = 0; i < nodeTypes.length; i++) {
- nodeTypes[i] = (NodeTypeImpl) ntMgr.getNodeType(nodeTypeName[i]);
- }
- }
-
- Path path;
- try {
- path = session.getQPath(absPath).getNormalizedPath();
- } catch (NameException e) {
- String msg = "invalid path syntax: " + absPath;
- log.debug(msg);
- throw new RepositoryException(msg, e);
- }
- NodeId[] ids = null;
- if (uuid != null) {
- ids = new NodeId[uuid.length];
- for (int i = 0; i < uuid.length; i++) {
- ids[i] = NodeId.valueOf(uuid[i]);
- }
- }
// create filter
- EventFilter filter = new EventFilter(itemMgr,
- session,
- eventTypes,
- path,
- isDeep,
- ids,
- nodeTypes,
- noLocal);
+ EventFilter filter = createEventFilter(eventTypes, absPath,
+ isDeep, uuid, nodeTypeName, noLocal);
dispatcher.addConsumer(new EventConsumer(session, listener, filter));
}
@@ -180,6 +147,60 @@
}
+ /**
+ * Creates a new event filter with the given restrictions.
+ *
+ * @param eventTypes A combination of one or more event type constants encoded as a bitmask.
+ * @param absPath an absolute path.
+ * @param isDeep a <code>boolean</code>.
+ * @param uuid array of UUIDs.
+ * @param nodeTypeName array of node type names.
+ * @param noLocal a <code>boolean</code>.
+ * @return the event filter with the given restrictions.
+ * @throws RepositoryException if an error occurs.
+ */
+ public EventFilter createEventFilter(int eventTypes,
+ String absPath,
+ boolean isDeep,
+ String[] uuid,
+ String[] nodeTypeName,
+ boolean noLocal)
+ throws RepositoryException {
+ // create NodeType instances from names
+ NodeTypeImpl[] nodeTypes;
+ if (nodeTypeName == null) {
+ nodeTypes = null;
+ } else {
+ NodeTypeManagerImpl ntMgr = session.getNodeTypeManager();
+ nodeTypes = new NodeTypeImpl[nodeTypeName.length];
+ for (int i = 0; i < nodeTypes.length; i++) {
+ nodeTypes[i] = (NodeTypeImpl) ntMgr.getNodeType(nodeTypeName[i]);
+ }
+ }
+
+ Path path;
+ try {
+ path = session.getQPath(absPath).getNormalizedPath();
+ } catch (NameException e) {
+ String msg = "invalid path syntax: " + absPath;
+ log.debug(msg);
+ throw new RepositoryException(msg, e);
+ }
+ if (!path.isAbsolute()) {
+ throw new RepositoryException("absPath must be absolute");
+ }
+ NodeId[] ids = null;
+ if (uuid != null) {
+ ids = new NodeId[uuid.length];
+ for (int i = 0; i < uuid.length; i++) {
+ ids[i] = NodeId.valueOf(uuid[i]);
+ }
+ }
+ // create filter
+ return new EventFilter(itemMgr, session, eventTypes, path,
+ isDeep, ids, nodeTypes, noLocal);
+ }
+
//------------------------------------------< EventStateCollectionFactory >
/**
Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java Fri Nov 14 02:10:15 2008
@@ -23,11 +23,6 @@
import java.util.ArrayList;
import java.util.NoSuchElementException;
-import org.apache.jackrabbit.core.journal.AbstractJournal;
-import org.apache.jackrabbit.core.journal.AppendRecord;
-import org.apache.jackrabbit.core.journal.InstanceRevision;
-import org.apache.jackrabbit.core.journal.JournalException;
-import org.apache.jackrabbit.core.journal.RecordIterator;
import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,7 +144,7 @@
/**
* {@inheritDoc}
*/
- protected RecordIterator getRecords(long startRevision)
+ public RecordIterator getRecords(long startRevision)
throws JournalException {
checkState();
@@ -161,6 +156,13 @@
}
/**
+ * {@inheritDoc}
+ */
+ public RecordIterator getRecords() throws JournalException {
+ return new MemoryRecordIterator(0, records.size());
+ }
+
+ /**
* Set records. Used to share records between two journal implementations.
*
* @param records array list that should back up this memory journal
Added: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java?rev=713964&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java Fri Nov 14 02:10:15 2008
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.observation;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import javax.jcr.RepositoryException;
+import javax.jcr.Node;
+import javax.jcr.Session;
+import javax.jcr.observation.Event;
+
+import org.apache.jackrabbit.test.AbstractJCRTest;
+import org.apache.jackrabbit.core.WorkspaceImpl;
+import org.apache.jackrabbit.api.jsr283.observation.EventJournal;
+
+/**
+ * <code>EventJournalTest</code> performs EventJournal tests.
+ */
+public class EventJournalTest extends AbstractJCRTest {
+
+ private static final int ALL_TYPES = Event.NODE_ADDED | Event.NODE_REMOVED | Event.PROPERTY_ADDED | Event.PROPERTY_CHANGED | Event.PROPERTY_REMOVED;
+
+ private EventJournal journal;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ journal = getEventJournal(ALL_TYPES, "/", true, null, null);
+ }
+
+ public void testSkipToNow() throws RepositoryException {
+ // skip everything
+ journal.skipTo(System.currentTimeMillis());
+ assertFalse(journal.hasNext());
+ }
+
+ public void testSkipTo() throws Exception {
+ long time = System.currentTimeMillis();
+
+ // add some nodes
+ Node n1 = testRootNode.addNode(nodeName1);
+ Node n2 = testRootNode.addNode(nodeName2);
+
+ // make sure some time passed otherwise we might
+ // skip this change as well.
+ while (time == System.currentTimeMillis()) {
+ Thread.sleep(1);
+ }
+
+ // now save
+ superuser.save();
+
+ journal.skipTo(time);
+ // at least the two added nodes must be returned by the journal
+ checkJournal(new String[]{n1.getPath(), n2.getPath()}, new String[0]);
+ }
+
+ public void testLiveJournal() throws RepositoryException {
+ journal.skipTo(System.currentTimeMillis());
+ assertFalse(journal.hasNext());
+
+ testRootNode.addNode(nodeName1);
+ superuser.save();
+
+ assertTrue(journal.hasNext());
+ }
+
+ public void testWorkspaceSeparation() throws RepositoryException {
+ journal.skipTo(System.currentTimeMillis());
+ assertFalse(journal.hasNext());
+
+ Session session = helper.getSuperuserSession(workspaceName);
+ try {
+ Node rootNode = session.getRootNode();
+ if (rootNode.hasNode(nodeName1)) {
+ rootNode.getNode(nodeName1).remove();
+ } else {
+ rootNode.addNode(nodeName1);
+ }
+ session.save();
+ } finally {
+ session.logout();
+ }
+
+ assertFalse(journal.hasNext());
+ }
+
+ public void testEventType() throws RepositoryException {
+ Node n1 = testRootNode.addNode(nodeName1);
+
+ journal = getEventJournal(Event.PROPERTY_ADDED, testRoot, true, null, null);
+ journal.skipTo(System.currentTimeMillis());
+
+ superuser.save();
+
+ checkJournal(new String[]{n1.getPath() + "/" + jcrPrimaryType},
+ new String[]{n1.getPath()});
+ }
+
+ public void testPath() throws RepositoryException {
+ Node n1 = testRootNode.addNode(nodeName1);
+ Node n2 = n1.addNode(nodeName2);
+
+ journal = getEventJournal(ALL_TYPES, n1.getPath(), true, null, null);
+ journal.skipTo(System.currentTimeMillis());
+
+ superuser.save();
+
+ checkJournal(new String[]{n2.getPath()}, new String[]{n1.getPath()});
+ }
+
+ public void testIsDeepTrue() throws RepositoryException {
+ Node n1 = testRootNode.addNode(nodeName1);
+ Node n2 = n1.addNode(nodeName2);
+
+ journal = getEventJournal(ALL_TYPES, testRoot, true, null, null);
+ journal.skipTo(System.currentTimeMillis());
+
+ superuser.save();
+
+ checkJournal(new String[]{n1.getPath(), n2.getPath()}, new String[0]);
+ }
+
+ public void testIsDeepFalse() throws RepositoryException {
+ Node n1 = testRootNode.addNode(nodeName1);
+ Node n2 = n1.addNode(nodeName2);
+
+ journal = getEventJournal(ALL_TYPES, testRoot, false, null, null);
+ journal.skipTo(System.currentTimeMillis());
+
+ superuser.save();
+
+ checkJournal(new String[]{n1.getPath()}, new String[]{n2.getPath()});
+ }
+
+ public void testUUID() throws RepositoryException {
+ Node n1 = testRootNode.addNode(nodeName1);
+ if (!n1.isNodeType(mixReferenceable)) {
+ n1.addMixin(mixReferenceable);
+ }
+ superuser.save();
+
+ Node n2 = n1.addNode(nodeName2);
+
+ journal = getEventJournal(ALL_TYPES, "/", true, new String[]{n1.getUUID()}, null);
+ journal.skipTo(System.currentTimeMillis());
+
+ superuser.save();
+
+ checkJournal(new String[]{n2.getPath()}, new String[0]);
+ }
+
+ public void testNodeType() throws RepositoryException {
+ Node n1 = testRootNode.addNode(nodeName1, "nt:folder");
+ Node n2 = n1.addNode(nodeName2, "nt:folder");
+
+ journal = getEventJournal(ALL_TYPES, testRoot, true, null,
+ new String[]{"nt:folder"});
+ journal.skipTo(System.currentTimeMillis());
+
+ superuser.save();
+
+ checkJournal(new String[]{n2.getPath()}, new String[]{n1.getPath()});
+ }
+
+ //-------------------------------< internal >-------------------------------
+
+ private EventJournal getEventJournal(int eventTypes,
+ String absPath,
+ boolean isDeep,
+ String[] uuid,
+ String[] nodeTypeName)
+ throws RepositoryException {
+ return ((WorkspaceImpl) superuser.getWorkspace()).getEventJournal(
+ eventTypes, absPath, isDeep, uuid, nodeTypeName);
+ }
+
+ /**
+ * Checks the journal for events.
+ *
+ * @param allowed allowed paths for the returned events.
+ * @param denied denied paths for the returned events.
+ * @throws RepositoryException if an error occurs while reading the event
+ * journal.
+ */
+ private void checkJournal(String[] allowed, String[] denied) throws RepositoryException {
+ Set allowedSet = new HashSet(Arrays.asList(allowed));
+ Set deniedSet = new HashSet(Arrays.asList(denied));
+ while (journal.hasNext()) {
+ String path = journal.nextEvent().getPath();
+ allowedSet.remove(path);
+ if (deniedSet.contains(path)) {
+ fail(path + " must not be present in journal");
+ }
+ }
+ assertTrue("Missing paths in journal: " + allowedSet, allowedSet.isEmpty());
+ }
+}
Propchange: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/observation/TestAll.java Fri Nov 14 02:10:15 2008
@@ -40,6 +40,7 @@
suite.addTestSuite(MixinTest.class);
suite.addTestSuite(VersionEventsTest.class);
suite.addTestSuite(MoveInPlaceTest.class);
+ suite.addTestSuite(EventJournalTest.class);
return suite;
}
Modified: jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml?rev=713964&r1=713963&r2=713964&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/repository/repository.xml Fri Nov 14 02:10:15 2008
@@ -123,4 +123,11 @@
<SearchIndex class="org.apache.jackrabbit.core.query.lucene.SearchIndex">
<param name="path" value="${rep.home}/repository/index"/>
</SearchIndex>
+
+ <!--
+ Run with a cluster journal
+ -->
+ <Cluster id="node1">
+ <Journal class="org.apache.jackrabbit.core.journal.MemoryJournal"/>
+ </Cluster>
</Repository>