You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/05/19 17:30:50 UTC

svn commit: r1680315 - in /jackrabbit/oak/branches/1.2: ./ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/ oak-core/src/main/java/org/apache/jackrabbit/oak/plu...

Author: mreutegg
Date: Tue May 19 15:30:49 2015
New Revision: 1680315

URL: http://svn.apache.org/r1680315
Log:
OAK-2804: Conditional remove on DocumentStore

Merged revisions 1675382 and 1675555 from trunk

Added:
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtilsTest.java
      - copied unchanged from r1675382, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtilsTest.java
Modified:
    jackrabbit/oak/branches/1.2/   (props changed)
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java

Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 19 15:30:49 2015
@@ -1,3 +1,3 @@
 /jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414,1673436,1673644,1673662-1673664,1673669,1673695,1674046,1674065,1674075,1674107,1674228,1674880,1675055,1675332,1675354,1675357,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676725,1677579,1677581,1677609,1677611,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679235,1680182,1680222,1680232,1680236
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414,1673436,1673644,1673662-1673664,1673669,1673695,1674046,1674065,1674075,1674107,1674228,1674880,1675055,1675332,1675354,1675357,1675382,1675555,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676725,1677579,1677581,1677609,1677611,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679235,1680182,1680222,1680232,1680236
 /jackrabbit/trunk:1345480

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java Tue May 19 15:30:49 2015
@@ -138,7 +138,7 @@ public interface DocumentStore {
     <T extends Document> void remove(Collection<T> collection, String key);
 
     /**
-     * Batch remove documents with given key. Keys for documents that do not
+     * Batch remove documents with given keys. Keys for documents that do not
      * exist are simply ignored. If this method fails with an exception, then
      * only some of the documents identified by {@code keys} may have been
      * removed.
@@ -150,6 +150,21 @@ public interface DocumentStore {
     <T extends Document> void remove(Collection<T> collection, List<String> keys);
 
     /**
+     * Batch remove documents with given keys and corresponding conditions. Keys
+     * for documents that do not exist are simply ignored. A document is only
+     * removed if the corresponding conditions are met. If this method fails
+     * with an exception, then only some of the documents may have been removed.
+     *
+     * @param <T> the document type
+     * @param collection the collection.
+     * @param toRemove the keys of the documents to remove with the
+     *                 corresponding conditions.
+     * @return the number of removed documents.
+     */
+    <T extends Document> int remove(Collection<T> collection,
+                                     Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove);
+
+    /**
      * Try to create a list of documents. This method returns {@code code} iff
      * none of the documents existed before and the create was successful. This
      * method will return {@code false} if one of the documents already exists
@@ -198,7 +213,7 @@ public interface DocumentStore {
 
     /**
      * Performs a conditional update (e.g. using
-     * {@link UpdateOp.Operation.Type#CONTAINS_MAP_ENTRY} and only updates the
+     * {@link UpdateOp.Condition.Type#EXISTS} and only updates the
      * document if the condition is <code>true</code>. The returned document is
      * immutable.
      *

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue May 19 15:30:49 2015
@@ -371,6 +371,17 @@ public final class NodeDocument extends
     }
 
     /**
+     * See also {@link #MODIFIED_IN_SECS}.
+     *
+     * @return the time in seconds this document was last modified with five
+     *          seconds precision. Returns {@code null} if none is set.
+     */
+    @CheckForNull
+    public Long getModified() {
+        return (Long) get(MODIFIED_IN_SECS);
+    }
+
+    /**
      * Returns <tt>true</tt> if this node possibly has children.
      * If false then that indicates that there are no child
      *

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java Tue May 19 15:30:49 2015
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,6 +40,7 @@ public final class UpdateOp {
     private boolean isDelete;
 
     private final Map<Key, Operation> changes;
+    private Map<Key, Condition> conditions;
 
     /**
      * Create an update operation for the document with the given id. The commit
@@ -112,6 +114,14 @@ public final class UpdateOp {
         return changes;
     }
 
+    public Map<Key, Condition> getConditions() {
+        if (conditions == null) {
+            return Collections.emptyMap();
+        } else {
+            return conditions;
+        }
+    }
+
     /**
      * Checks if the UpdateOp has any change operation is registered with
      * current update operation
@@ -198,8 +208,26 @@ public final class UpdateOp {
         if (isNew) {
             throw new IllegalStateException("Cannot use containsMapEntry() on new document");
         }
-        Operation op = new Operation(Operation.Type.CONTAINS_MAP_ENTRY, exists);
-        changes.put(new Key(property, checkNotNull(revision)), op);
+        Condition c = exists ? Condition.EXISTS : Condition.MISSING;
+        getOrCreateConditions().put(new Key(property, checkNotNull(revision)), c);
+    }
+
+    /**
+     * Checks if the property or map entry is equal to the given value.
+     *
+     * @param property the name of the property or map.
+     * @param revision the revision within the map or {@code null} if this check
+     *                 is for a property.
+     * @param value the value to compare to.
+     */
+    void equals(@Nonnull String property,
+                @Nullable Revision revision,
+                @Nonnull Object value) {
+        if (isNew) {
+            throw new IllegalStateException("Cannot perform equals check on new document");
+        }
+        getOrCreateConditions().put(new Key(property, revision),
+                Condition.newEqualsCondition(value));
     }
 
     /**
@@ -229,6 +257,13 @@ public final class UpdateOp {
         return "key: " + id + " " + (isNew ? "new" : "update") + " " + changes;
     }
 
+    private Map<Key, Condition> getOrCreateConditions() {
+        if (conditions == null) {
+            conditions = Maps.newHashMap();
+        }
+        return conditions;
+    }
+
     /**
      * A DocumentStore operation for a given key within a document.
      */
@@ -269,12 +304,7 @@ public final class UpdateOp {
              * Remove the sub-key / value pair.
              * The value in the stored node is a map.
              */
-            REMOVE_MAP_ENTRY,
-
-            /**
-             * Checks if the sub-key is present in a map or not.
-             */
-            CONTAINS_MAP_ENTRY
+            REMOVE_MAP_ENTRY
 
          }
 
@@ -308,7 +338,6 @@ public final class UpdateOp {
             case SET:
             case MAX:
             case REMOVE_MAP_ENTRY:
-            case CONTAINS_MAP_ENTRY:
                 // nothing to do
                 break;
             case SET_MAP_ENTRY:
@@ -321,6 +350,66 @@ public final class UpdateOp {
     }
 
     /**
+     * A condition to check before an update is applied.
+     */
+    public static final class Condition {
+
+        /**
+         * Check if a sub-key exists in a map.
+         */
+        public static final Condition EXISTS = new Condition(Type.EXISTS, true);
+
+        /**
+         * Check if a sub-key is missing in a map.
+         */
+        public static final Condition MISSING = new Condition(Type.EXISTS, false);
+
+        public enum Type {
+
+            /**
+             * Checks if the sub-key is present in a map or not.
+             */
+            EXISTS,
+
+            /**
+             * Checks if a map entry equals a given value.
+             */
+            EQUALS
+
+        }
+
+        /**
+         * The condition type.
+         */
+        public final Type type;
+
+        /**
+         * The value.
+         */
+        public final Object value;
+
+        private Condition(Type type, Object value) {
+            this.type = type;
+            this.value = value;
+        }
+
+        /**
+         * Creates a new equals condition with the given value.
+         *
+         * @param value the value to compare to.
+         * @return the equals condition.
+         */
+        public static Condition newEqualsCondition(@Nonnull Object value) {
+            return new Condition(Type.EQUALS, checkNotNull(value));
+        }
+
+        @Override
+        public String toString() {
+            return type + " " + value;
+        }
+    }
+
+    /**
      * A key for an operation consists of a property name and an optional
      * revision. The revision is only set if the value for the operation is
      * set for a certain revision.

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java Tue May 19 15:30:49 2015
@@ -25,6 +25,9 @@ import java.util.Map.Entry;
 
 import javax.annotation.Nonnull;
 
+import com.google.common.base.Objects;
+
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
 
@@ -96,31 +99,29 @@ public class UpdateUtils {
                     }
                     break;
                 }
-                case CONTAINS_MAP_ENTRY:
-                    // no effect
-                    break;
             }
         }
     }
 
-    public static boolean checkConditions(@Nonnull Document doc, @Nonnull UpdateOp update) {
-        for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
-            Operation op = change.getValue();
-            if (op.type == Operation.Type.CONTAINS_MAP_ENTRY) {
-                Key k = change.getKey();
-                Revision r = k.getRevision();
+    public static boolean checkConditions(@Nonnull Document doc,
+                                          @Nonnull Map<Key, Condition> conditions) {
+        for (Map.Entry<Key, Condition> entry : conditions.entrySet()) {
+            Condition c = entry.getValue();
+            Key k = entry.getKey();
+            Object value = doc.get(k.getName());
+            Revision r = k.getRevision();
+            if (c.type == Condition.Type.EXISTS) {
                 if (r == null) {
-                    throw new IllegalStateException("CONTAINS_MAP_ENTRY must not contain null revision");
+                    throw new IllegalStateException("EXISTS must not contain null revision");
                 }
-                Object value = doc.get(k.getName());
                 if (value == null) {
-                    if (Boolean.TRUE.equals(op.value)) {
+                    if (Boolean.TRUE.equals(c.value)) {
                         return false;
                     }
                 } else {
                     if (value instanceof Map) {
                         Map<?, ?> map = (Map<?, ?>) value;
-                        if (Boolean.TRUE.equals(op.value)) {
+                        if (Boolean.TRUE.equals(c.value)) {
                             if (!map.containsKey(r)) {
                                 return false;
                             }
@@ -133,6 +134,19 @@ public class UpdateUtils {
                         return false;
                     }
                 }
+            } else if (c.type == Condition.Type.EQUALS) {
+                if (r != null) {
+                    if (value instanceof Map) {
+                        value = ((Map) value).get(r);
+                    } else {
+                        value = null;
+                    }
+                }
+                if (!Objects.equal(value, c.value)) {
+                    return false;
+                }
+            } else {
+                throw new IllegalArgumentException("Unknown condition: " + c.type);
             }
         }
         return true;

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Tue May 19 15:30:49 2015
@@ -39,6 +39,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
 import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
 
 import com.google.common.base.Splitter;
@@ -46,6 +47,8 @@ import com.mongodb.ReadPreference;
 import com.mongodb.WriteConcern;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 
+import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions;
+
 /**
  * Emulates a MongoDB store (possibly consisting of multiple shards and
  * replicas).
@@ -161,11 +164,11 @@ public class MemoryDocumentStore impleme
     }
 
     @Override
-    public <T extends Document> void remove(Collection<T> collection, String path) {
+    public <T extends Document> void remove(Collection<T> collection, String key) {
         Lock lock = rwLock.writeLock();
         lock.lock();
         try {
-            getMap(collection).remove(path);
+            getMap(collection).remove(key);
         } finally {
             lock.unlock();
         }
@@ -178,6 +181,28 @@ public class MemoryDocumentStore impleme
         }
     }
 
+    @Override
+    public <T extends Document> int remove(Collection<T> collection,
+                                           Map<String, Map<UpdateOp.Key, Condition>> toRemove) {
+        int num = 0;
+        ConcurrentSkipListMap<String, T> map = getMap(collection);
+        for (Map.Entry<String, Map<UpdateOp.Key, Condition>> entry : toRemove.entrySet()) {
+            Lock lock = rwLock.writeLock();
+            lock.lock();
+            try {
+                T doc = map.get(entry.getKey());
+                if (doc != null && checkConditions(doc, entry.getValue())) {
+                    if (map.remove(entry.getKey()) != null) {
+                        num++;
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+        return num;
+    }
+
     @CheckForNull
     @Override
     public <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp update) {
@@ -230,7 +255,7 @@ public class MemoryDocumentStore impleme
             } else {
                 oldDoc.deepCopy(doc);
             }
-            if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+            if (checkConditions && !checkConditions(doc, update.getConditions())) {
                 return null;
             }
             // update the document

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Tue May 19 15:30:49 2015
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -42,6 +43,7 @@ import com.google.common.collect.Immutab
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import com.mongodb.MongoClientURI;
+import com.mongodb.QueryOperators;
 import com.mongodb.ReadPreference;
 
 import org.apache.jackrabbit.oak.cache.CacheStats;
@@ -56,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
 import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
@@ -101,7 +104,7 @@ public class MongoDocumentStore implemen
 
     private static final DBObject BY_ID_ASC = new BasicDBObject(Document.ID, 1);
 
-    static enum DocumentReadPreference {
+    enum DocumentReadPreference {
         PRIMARY,
         PREFER_PRIMARY,
         PREFER_SECONDARY,
@@ -114,11 +117,6 @@ public class MongoDocumentStore implemen
     private final DBCollection clusterNodes;
     private final DBCollection settings;
 
-    /**
-     * The sum of all milliseconds this class waited for MongoDB.
-     */
-    private long timeSum;
-
     private final Cache<CacheValue, NodeDocument> nodesCache;
     private final CacheStats cacheStats;
 
@@ -593,12 +591,11 @@ public class MongoDocumentStore implemen
         DBCollection dbCollection = getDBCollection(collection);
         long start = PERFLOG.start();
         try {
-            WriteResult writeResult = dbCollection.remove(getByKeyQuery(key).get());
-            invalidateCache(collection, key);
-            if (writeResult.getError() != null) {
-                throw new DocumentStoreException("Remove failed: " + writeResult.getError());
-            }
+            dbCollection.remove(getByKeyQuery(key).get());
+        } catch (Exception e) {
+            throw DocumentStoreException.convert(e, "Remove failed for " + key);
         } finally {
+            invalidateCache(collection, key);
             PERFLOG.end(start, 1, "remove key={}", key);
         }
     }
@@ -607,15 +604,58 @@ public class MongoDocumentStore implemen
     public <T extends Document> void remove(Collection<T> collection, List<String> keys) {
         log("remove", keys);
         DBCollection dbCollection = getDBCollection(collection);
-        for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){
-            DBObject query = QueryBuilder.start(Document.ID).in(keyBatch).get();
-            WriteResult writeResult = dbCollection.remove(query);
-            invalidateCache(collection, keyBatch);
-            if (writeResult.getError() != null) {
-                throw new DocumentStoreException("Remove failed: " + writeResult.getError());
+        long start = PERFLOG.start();
+        try {
+            for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){
+                DBObject query = QueryBuilder.start(Document.ID).in(keyBatch).get();
+                try {
+                    dbCollection.remove(query);
+                } catch (Exception e) {
+                    throw DocumentStoreException.convert(e, "Remove failed for " + keyBatch);
+                } finally {
+                    invalidateCache(collection, keyBatch);
+                }
             }
+        } finally {
+            PERFLOG.end(start, 1, "remove keys={}", keys);
         }
+    }
 
+    @Override
+    public <T extends Document> int remove(Collection<T> collection,
+                                           Map<String, Map<Key, Condition>> toRemove) {
+        log("remove", toRemove);
+        int num = 0;
+        DBCollection dbCollection = getDBCollection(collection);
+        long start = PERFLOG.start();
+        try {
+            List<String> batchIds = Lists.newArrayList();
+            List<DBObject> batch = Lists.newArrayList();
+            Iterator<Entry<String, Map<Key, Condition>>> it = toRemove.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<String, Map<Key, Condition>> entry = it.next();
+                QueryBuilder query = createQueryForUpdate(
+                        entry.getKey(), entry.getValue());
+                batchIds.add(entry.getKey());
+                batch.add(query.get());
+                if (!it.hasNext() || batch.size() == IN_CLAUSE_BATCH_SIZE) {
+                    DBObject q = new BasicDBObject();
+                    q.put(QueryOperators.OR, batch);
+                    try {
+                        num += dbCollection.remove(q).getN();
+                    } catch (Exception e) {
+                        throw DocumentStoreException.convert(e, "Remove failed for " + batch);
+                    } finally {
+                        invalidateCache(collection, Lists.newArrayList(batchIds));
+                    }
+                    batchIds.clear();
+                    batch.clear();
+                }
+            }
+        } finally {
+            PERFLOG.end(start, 1, "remove keys={}", toRemove);
+        }
+        return num;
     }
 
     @CheckForNull
@@ -646,7 +686,9 @@ public class MongoDocumentStore implemen
             // perform a conditional update with limited result
             // if we have a matching modCount
             if (modCount != null) {
-                QueryBuilder query = createQueryForUpdate(updateOp, checkConditions);
+
+                QueryBuilder query = createQueryForUpdate(updateOp.getId(),
+                        updateOp.getConditions());
                 query.and(Document.MOD_COUNT).is(modCount);
                 DBObject fields = new BasicDBObject();
                 // return _id only
@@ -665,10 +707,8 @@ public class MongoDocumentStore implemen
 
             // conditional update failed or not possible
             // perform operation and get complete document
-            QueryBuilder query = createQueryForUpdate(updateOp, checkConditions);
-            DBObject oldNode = dbCollection.findAndModify(query.get(), null,
-                    null /*sort*/, false /*remove*/, update, false /*returnNew*/,
-                    upsert);
+            QueryBuilder query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions());
+            DBObject oldNode = dbCollection.findAndModify(query.get(), null, null /*sort*/, false /*remove*/, update, false /*returnNew*/, upsert);
             if (checkConditions && oldNode == null) {
                 return null;
             }
@@ -740,9 +780,6 @@ public class MongoDocumentStore implemen
                     case REMOVE_MAP_ENTRY:
                         // nothing to do for new entries
                         break;
-                    case CONTAINS_MAP_ENTRY:
-                        // no effect
-                        break;
                 }
             }
             if (!inserts[i].containsField(Document.MOD_COUNT)) {
@@ -943,9 +980,6 @@ public class MongoDocumentStore implemen
 
     @Override
     public void dispose() {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("MongoDB time: " + timeSum);
-        }
         nodes.getDB().getMongo().close();
 
         if (nodesCache instanceof Closeable) {
@@ -1112,22 +1146,23 @@ public class MongoDocumentStore implemen
     }
 
     @Nonnull
-    private static QueryBuilder createQueryForUpdate(UpdateOp updateOp,
-                                              boolean checkConditions) {
-        QueryBuilder query = getByKeyQuery(updateOp.getId());
+    private static QueryBuilder createQueryForUpdate(String key,
+                                                     Map<Key, Condition> conditions) {
+        QueryBuilder query = getByKeyQuery(key);
 
-        for (Entry<Key, Operation> entry : updateOp.getChanges().entrySet()) {
+        for (Entry<Key, Condition> entry : conditions.entrySet()) {
             Key k = entry.getKey();
-            Operation op = entry.getValue();
-            switch (op.type) {
-                case CONTAINS_MAP_ENTRY: {
-                    if (checkConditions) {
-                        query.and(k.toString()).exists(op.value);
-                    }
+            Condition c = entry.getValue();
+            switch (c.type) {
+                case EXISTS:
+                    query.and(k.toString()).exists(c.value);
+                    break;
+                case EQUALS:
+                    query.and(k.toString()).is(c.value);
                     break;
-                }
             }
         }
+
         return query;
     }
 

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java Tue May 19 15:30:49 2015
@@ -102,10 +102,6 @@ public class RDBDocumentSerializer {
             if (columnProperties.contains(key.getName()) && null == key.getRevision())
                 continue;
 
-            // already checked
-            if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY)
-                continue;
-
             if (needComma) {
                 sb.append(",");
             }
@@ -177,7 +173,7 @@ public class RDBDocumentSerializer {
     }
 
     /**
-     * Reconstructs a {@link Document) based on the persisted {@link DBRow}.
+     * Reconstructs a {@link Document) based on the persisted {@link RDBRow}.
      */
     public <T extends Document> T fromRow(@Nonnull Collection<T> collection, @Nonnull RDBRow row) throws DocumentStoreException {
         T doc = collection.newDocument(store);

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue May 19 15:30:49 2015
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.document.rdb;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,6 +36,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -60,6 +62,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.Revision;
 import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
 import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
@@ -73,10 +76,11 @@ import com.google.common.base.Objects;
 import com.google.common.cache.Cache;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Striped;
 
 /**
- * Implementation of {@link CachingDocumentStore} for relational databases.
+ * Implementation of {@link DocumentStore} for relational databases.
  * 
  * <h3>Supported Databases</h3>
  * <p>
@@ -242,6 +246,16 @@ public class RDBDocumentStore implements
     }
 
     @Override
+    public <T extends Document> int remove(Collection<T> collection,
+                                            Map<String, Map<Key, Condition>> toRemove) {
+        int num = delete(collection, toRemove);
+        for (String id : toRemove.keySet()) {
+            invalidateCache(collection, id, true);
+        }
+        return num;
+    }
+
+    @Override
     public <T extends Document> boolean create(Collection<T> collection, List<UpdateOp> updateOps) {
         return internalCreate(collection, updateOps);
     }
@@ -1005,7 +1019,7 @@ public class RDBDocumentStore implements
                 throw new DocumentStoreException("Document does not exist: " + update.getId());
             }
             T doc = collection.newDocument(this);
-            if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+            if (checkConditions && !checkConditions(doc, update.getConditions())) {
                 return null;
             }
             update.increment(MODCOUNT, 1);
@@ -1105,7 +1119,7 @@ public class RDBDocumentStore implements
     private <T extends Document> T applyChanges(Collection<T> collection, T oldDoc, UpdateOp update, boolean checkConditions) {
         T doc = collection.newDocument(this);
         oldDoc.deepCopy(doc);
-        if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+        if (checkConditions && !checkConditions(doc, update.getConditions())) {
             return null;
         }
         if (hasChangesToCollisions(update)) {
@@ -1263,13 +1277,14 @@ public class RDBDocumentStore implements
         }
     }
 
-    private <T extends Document> void delete(Collection<T> collection, List<String> ids) {
+    private <T extends Document> int delete(Collection<T> collection, List<String> ids) {
+        int numDeleted = 0;
         for (List<String> sublist : Lists.partition(ids, 64)) {
             Connection connection = null;
             String tableName = getTable(collection);
             try {
                 connection = this.ch.getRWConnection();
-                dbDelete(connection, tableName, sublist);
+                numDeleted += dbDelete(connection, tableName, sublist);
                 connection.commit();
             } catch (Exception ex) {
                 throw new DocumentStoreException(ex);
@@ -1277,6 +1292,33 @@ public class RDBDocumentStore implements
                 this.ch.closeConnection(connection);
             }
         }
+        return numDeleted;
+    }
+
+    private <T extends Document> int delete(Collection<T> collection,
+                                            Map<String, Map<Key, Condition>> toRemove) {
+        int numDeleted = 0;
+        String tableName = getTable(collection);
+        Map<String, Map<Key, Condition>> subMap = Maps.newHashMap();
+        Iterator<Entry<String, Map<Key, Condition>>> it = toRemove.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, Map<Key, Condition>> entry = it.next();
+            subMap.put(entry.getKey(), entry.getValue());
+            if (subMap.size() == 64 || !it.hasNext()) {
+                Connection connection = null;
+                try {
+                    connection = this.ch.getRWConnection();
+                    numDeleted += dbDelete(connection, tableName, subMap);
+                    connection.commit();
+                } catch (Exception ex) {
+                    throw DocumentStoreException.convert(ex);
+                } finally {
+                    this.ch.closeConnection(connection);
+                }
+                subMap.clear();
+            }
+        }
+        return numDeleted;
     }
 
     private <T extends Document> boolean updateDocument(@Nonnull Collection<T> collection, @Nonnull T document,
@@ -1346,12 +1388,7 @@ public class RDBDocumentStore implements
      * state
      */
     private static boolean requiresPreviousState(UpdateOp update) {
-        for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
-            Operation op = change.getValue();
-            if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY)
-                return true;
-        }
-        return false;
+        return !update.getConditions().isEmpty();
     }
 
     private static long getModifiedFromUpdate(UpdateOp update) {
@@ -1735,7 +1772,7 @@ public class RDBDocumentStore implements
         }
     }
 
-    private void dbDelete(Connection connection, String tableName, List<String> ids) throws SQLException {
+    private int dbDelete(Connection connection, String tableName, List<String> ids) throws SQLException {
 
         PreparedStatement stmt;
         int cnt = ids.size();
@@ -1761,6 +1798,52 @@ public class RDBDocumentStore implements
             if (result != cnt) {
                 LOG.debug("DB delete failed for " + tableName + "/" + ids);
             }
+            return result;
+        } finally {
+            stmt.close();
+        }
+    }
+
+    private int dbDelete(Connection connection, String tableName,
+                         Map<String, Map<Key, Condition>> toDelete)
+            throws SQLException, DocumentStoreException {
+        String or = "";
+        StringBuilder whereClause = new StringBuilder();
+        for (Entry<String, Map<Key, Condition>> entry : toDelete.entrySet()) {
+            whereClause.append(or);
+            or = " or ";
+            whereClause.append("ID=?");
+            for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
+                if (!c.getKey().getName().equals(MODIFIED)) {
+                    throw new DocumentStoreException(
+                            "Unsupported condition: " + c);
+                }
+                whereClause.append(" and MODIFIED");
+                if (c.getValue().type == Condition.Type.EQUALS
+                        && c.getValue().value instanceof Long) {
+                    whereClause.append("=?");
+                } else if (c.getValue().type == Condition.Type.EXISTS) {
+                    whereClause.append(" is not null");
+                } else {
+                    throw new DocumentStoreException(
+                            "Unsupported condition: " + c);
+                }
+            }
+        }
+
+        PreparedStatement stmt= connection.prepareStatement(
+                "delete from " + tableName + " where " + whereClause);
+        try {
+            int i = 1;
+            for (Entry<String, Map<Key, Condition>> entry : toDelete.entrySet()) {
+                setIdInStatement(stmt, i++, entry.getKey());
+                for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
+                    if (c.getValue().type == Condition.Type.EQUALS) {
+                        stmt.setLong(i++, (Long) c.getValue().value);
+                    }
+                }
+            }
+            return stmt.executeUpdate();
         } finally {
             stmt.close();
         }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Tue May 19 15:30:49 2015
@@ -144,9 +144,29 @@ public class LoggingDocumentStoreWrapper
 
     @Override
     public <T extends Document> void remove(Collection<T> collection, List<String> keys) {
-        //TODO Logging
-        for(String key : keys){
-            remove(collection, key);
+        try {
+            logMethod("remove", collection, keys);
+            store.remove(collection, keys);
+        } catch (Exception e) {
+            logException(e);
+            throw convert(e);
+        }
+    }
+
+    @Override
+    public <T extends Document> int remove(final Collection<T> collection,
+                                           final Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove) {
+        try {
+            logMethod("remove", collection, toRemove);
+            return logResult(new Callable<Integer>() {
+                @Override
+                public Integer call() throws Exception {
+                    return store.remove(collection, toRemove);
+                }
+            });
+        } catch (Exception e) {
+            logException(e);
+            throw convert(e);
         }
     }
 

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Tue May 19 15:30:49 2015
@@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.plugin
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nonnull;
+
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.plugins.document.Collection;
 import org.apache.jackrabbit.oak.plugins.document.Document;
@@ -49,12 +51,14 @@ public class SynchronizingDocumentStoreW
     }
 
     @Override
+    @Nonnull
     public synchronized <T extends Document> List<T> query(final Collection<T> collection, final String fromKey,
             final String toKey, final int limit) {
         return store.query(collection, fromKey, toKey, limit);
     }
 
     @Override
+    @Nonnull
     public synchronized <T extends Document> List<T> query(final Collection<T> collection, final String fromKey,
             final String toKey, final String indexedProperty, final long startValue, final int limit) {
         return store.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
@@ -67,9 +71,13 @@ public class SynchronizingDocumentStoreW
 
     @Override
     public synchronized <T extends Document> void remove(Collection<T> collection, List<String> keys) {
-        for(String key : keys){
-            remove(collection, key);
-        }
+        store.remove(collection, keys);
+    }
+
+    @Override
+    public synchronized <T extends Document> int remove(Collection<T> collection,
+                                                        Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove) {
+        return store.remove(collection, toRemove);
     }
 
     @Override

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Tue May 19 15:30:49 2015
@@ -180,9 +180,31 @@ public class TimingDocumentStoreWrapper
 
     @Override
     public <T extends Document> void remove(Collection<T> collection, List<String> keys) {
-        //TODO Timing
-        for(String key : keys){
-            remove(collection, key);
+        try {
+            long start = now();
+            base.remove(collection, keys);
+            updateAndLogTimes("remove", start, 0, 0);
+            if (logCommonCall()) {
+                logCommonCall(start, "remove " + collection + " " + keys);
+            }
+        } catch (Exception e) {
+            throw convert(e);
+        }
+    }
+
+    @Override
+    public <T extends Document> int remove(Collection<T> collection,
+                                           Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove) {
+        try {
+            long start = now();
+            int result = base.remove(collection, toRemove);
+            updateAndLogTimes("remove", start, 0, 0);
+            if (logCommonCall()) {
+                logCommonCall(start, "remove " + collection + " " + toRemove);
+            }
+            return result;
+        } catch (Exception e) {
+            throw convert(e);
         }
     }
 

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java Tue May 19 15:30:49 2015
@@ -37,6 +37,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -428,8 +434,7 @@ public class BasicDocumentStoreTest exte
             cnt += 1;
         }
 
-        LOG.info("document creation with property of size " + size + " and batch size " + amount + " for " + super.dsname + " was "
-                + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
+        LOG.info("document creation with property of size " + size + " and batch size " + amount + " for " + super.dsname + " was " + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
     }
 
     @Test
@@ -644,8 +649,7 @@ public class BasicDocumentStoreTest exte
             assertTrue(m.keySet().equals(expectedRevs));
         }
 
-        LOG.info("document updates with property of size " + size + (growing ? " (growing)" : "") + " for " + super.dsname
-                + " was " + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
+        LOG.info("document updates with property of size " + size + (growing ? " (growing)" : "") + " for " + super.dsname + " was " + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
     }
 
     private static String generateString(int length, boolean ascii) {
@@ -985,4 +989,53 @@ public class BasicDocumentStoreTest exte
         Map<String, String> desc = ds.getMetadata();
         assertNotNull(desc.get("type"));
     }
+
+    @Test
+    public void removeWithCondition() throws Exception {
+        List<UpdateOp> docs = Lists.newArrayList();
+        docs.add(newDocument("/foo", 100));
+        removeMe.add(Utils.getIdFromPath("/foo"));
+        docs.add(newDocument("/bar", 200));
+        removeMe.add(Utils.getIdFromPath("/bar"));
+        docs.add(newDocument("/baz", 300));
+        removeMe.add(Utils.getIdFromPath("/baz"));
+        ds.create(Collection.NODES, docs);
+
+        for (UpdateOp op : docs) {
+            assertNotNull(ds.find(Collection.NODES, op.getId()));
+        }
+
+        Map<String, Map<Key, Condition>> toRemove = Maps.newHashMap();
+        removeDocument(toRemove, "/foo", 100); // matches
+        removeDocument(toRemove, "/bar", 300); // modified differs
+        removeDocument(toRemove, "/qux", 100); // does not exist
+        removeDocument(toRemove, "/baz", 300); // matches
+
+        int removed = ds.remove(Collection.NODES, toRemove);
+
+        assertEquals(2, removed);
+        assertNotNull(ds.find(Collection.NODES, Utils.getIdFromPath("/bar")));
+        for (NodeDocument doc : Utils.getAllDocuments(ds)) {
+            if (!doc.getPath().equals("/bar")) {
+                fail("document must not exist: " + doc.getId());
+            }
+        }
+    }
+
+    private UpdateOp newDocument(String path, long modified) {
+        String id = Utils.getIdFromPath(path);
+        UpdateOp op = new UpdateOp(id, true);
+        op.set(NodeDocument.MODIFIED_IN_SECS, modified);
+        op.set(Document.ID, id);
+        return op;
+    }
+
+    private void removeDocument(Map<String, Map<Key, Condition>> toRemove,
+                                String path,
+                                long modified) {
+        toRemove.put(Utils.getIdFromPath(path),
+                Collections.singletonMap(
+                        new Key(NodeDocument.MODIFIED_IN_SECS, null),
+                        Condition.newEqualsCondition(modified)));
+    }
 }