You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/05/19 17:30:50 UTC
svn commit: r1680315 - in /jackrabbit/oak/branches/1.2: ./
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/
oak-core/src/main/java/org/apache/jackrabbit/oak/plu...
Author: mreutegg
Date: Tue May 19 15:30:49 2015
New Revision: 1680315
URL: http://svn.apache.org/r1680315
Log:
OAK-2804: Conditional remove on DocumentStore
Merged revisions 1675382 and 1675555 from trunk
Added:
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtilsTest.java
- copied unchanged from r1675382, jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtilsTest.java
Modified:
jackrabbit/oak/branches/1.2/ (props changed)
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 19 15:30:49 2015
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414,1673436,1673644,1673662-1673664,1673669,1673695,1674046,1674065,1674075,1674107,1674228,1674880,1675055,1675332,1675354,1675357,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676725,1677579,1677581,1677609,1677611,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679235,1680182,1680222,1680232,1680236
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414,1673436,1673644,1673662-1673664,1673669,1673695,1674046,1674065,1674075,1674107,1674228,1674880,1675055,1675332,1675354,1675357,1675382,1675555,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676725,1677579,1677581,1677609,1677611,1677939,1677991,1678173,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679235,1680182,1680222,1680232,1680236
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java Tue May 19 15:30:49 2015
@@ -138,7 +138,7 @@ public interface DocumentStore {
<T extends Document> void remove(Collection<T> collection, String key);
/**
- * Batch remove documents with given key. Keys for documents that do not
+ * Batch remove documents with given keys. Keys for documents that do not
* exist are simply ignored. If this method fails with an exception, then
* only some of the documents identified by {@code keys} may have been
* removed.
@@ -150,6 +150,21 @@ public interface DocumentStore {
<T extends Document> void remove(Collection<T> collection, List<String> keys);
/**
+ * Batch remove documents with given keys and corresponding conditions. Keys
+ * for documents that do not exist are simply ignored. A document is only
+ * removed if the corresponding conditions are met. If this method fails
+ * with an exception, then only some of the documents may have been removed.
+ *
+ * @param <T> the document type
+ * @param collection the collection.
+ * @param toRemove the keys of the documents to remove with the
+ * corresponding conditions.
+ * @return the number of removed documents.
+ */
+ <T extends Document> int remove(Collection<T> collection,
+ Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove);
+
+ /**
* Try to create a list of documents. This method returns {@code code} iff
* none of the documents existed before and the create was successful. This
* method will return {@code false} if one of the documents already exists
@@ -198,7 +213,7 @@ public interface DocumentStore {
/**
* Performs a conditional update (e.g. using
- * {@link UpdateOp.Operation.Type#CONTAINS_MAP_ENTRY} and only updates the
+ * {@link UpdateOp.Condition.Type#EXISTS} and only updates the
* document if the condition is <code>true</code>. The returned document is
* immutable.
*
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java Tue May 19 15:30:49 2015
@@ -371,6 +371,17 @@ public final class NodeDocument extends
}
/**
+ * See also {@link #MODIFIED_IN_SECS}.
+ *
+ * @return the time in seconds this document was last modified with five
+ * seconds precision. Returns {@code null} if none is set.
+ */
+ @CheckForNull
+ public Long getModified() {
+ return (Long) get(MODIFIED_IN_SECS);
+ }
+
+ /**
* Returns <tt>true</tt> if this node possibly has children.
* If false then that indicates that there are no child
*
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java Tue May 19 15:30:49 2015
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,6 +40,7 @@ public final class UpdateOp {
private boolean isDelete;
private final Map<Key, Operation> changes;
+ private Map<Key, Condition> conditions;
/**
* Create an update operation for the document with the given id. The commit
@@ -112,6 +114,14 @@ public final class UpdateOp {
return changes;
}
+ public Map<Key, Condition> getConditions() {
+ if (conditions == null) {
+ return Collections.emptyMap();
+ } else {
+ return conditions;
+ }
+ }
+
/**
* Checks if the UpdateOp has any change operation is registered with
* current update operation
@@ -198,8 +208,26 @@ public final class UpdateOp {
if (isNew) {
throw new IllegalStateException("Cannot use containsMapEntry() on new document");
}
- Operation op = new Operation(Operation.Type.CONTAINS_MAP_ENTRY, exists);
- changes.put(new Key(property, checkNotNull(revision)), op);
+ Condition c = exists ? Condition.EXISTS : Condition.MISSING;
+ getOrCreateConditions().put(new Key(property, checkNotNull(revision)), c);
+ }
+
+ /**
+ * Checks if the property or map entry is equal to the given value.
+ *
+ * @param property the name of the property or map.
+ * @param revision the revision within the map or {@code null} if this check
+ * is for a property.
+ * @param value the value to compare to.
+ */
+ void equals(@Nonnull String property,
+ @Nullable Revision revision,
+ @Nonnull Object value) {
+ if (isNew) {
+ throw new IllegalStateException("Cannot perform equals check on new document");
+ }
+ getOrCreateConditions().put(new Key(property, revision),
+ Condition.newEqualsCondition(value));
}
/**
@@ -229,6 +257,13 @@ public final class UpdateOp {
return "key: " + id + " " + (isNew ? "new" : "update") + " " + changes;
}
+ private Map<Key, Condition> getOrCreateConditions() {
+ if (conditions == null) {
+ conditions = Maps.newHashMap();
+ }
+ return conditions;
+ }
+
/**
* A DocumentStore operation for a given key within a document.
*/
@@ -269,12 +304,7 @@ public final class UpdateOp {
* Remove the sub-key / value pair.
* The value in the stored node is a map.
*/
- REMOVE_MAP_ENTRY,
-
- /**
- * Checks if the sub-key is present in a map or not.
- */
- CONTAINS_MAP_ENTRY
+ REMOVE_MAP_ENTRY
}
@@ -308,7 +338,6 @@ public final class UpdateOp {
case SET:
case MAX:
case REMOVE_MAP_ENTRY:
- case CONTAINS_MAP_ENTRY:
// nothing to do
break;
case SET_MAP_ENTRY:
@@ -321,6 +350,66 @@ public final class UpdateOp {
}
/**
+ * A condition to check before an update is applied.
+ */
+ public static final class Condition {
+
+ /**
+ * Check if a sub-key exists in a map.
+ */
+ public static final Condition EXISTS = new Condition(Type.EXISTS, true);
+
+ /**
+ * Check if a sub-key is missing in a map.
+ */
+ public static final Condition MISSING = new Condition(Type.EXISTS, false);
+
+ public enum Type {
+
+ /**
+ * Checks if the sub-key is present in a map or not.
+ */
+ EXISTS,
+
+ /**
+ * Checks if a map entry equals a given value.
+ */
+ EQUALS
+
+ }
+
+ /**
+ * The condition type.
+ */
+ public final Type type;
+
+ /**
+ * The value.
+ */
+ public final Object value;
+
+ private Condition(Type type, Object value) {
+ this.type = type;
+ this.value = value;
+ }
+
+ /**
+ * Creates a new equals condition with the given value.
+ *
+ * @param value the value to compare to.
+ * @return the equals condition.
+ */
+ public static Condition newEqualsCondition(@Nonnull Object value) {
+ return new Condition(Type.EQUALS, checkNotNull(value));
+ }
+
+ @Override
+ public String toString() {
+ return type + " " + value;
+ }
+ }
+
+ /**
* A key for an operation consists of a property name and an optional
* revision. The revision is only set if the value for the operation is
* set for a certain revision.
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateUtils.java Tue May 19 15:30:49 2015
@@ -25,6 +25,9 @@ import java.util.Map.Entry;
import javax.annotation.Nonnull;
+import com.google.common.base.Objects;
+
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
@@ -96,31 +99,29 @@ public class UpdateUtils {
}
break;
}
- case CONTAINS_MAP_ENTRY:
- // no effect
- break;
}
}
}
- public static boolean checkConditions(@Nonnull Document doc, @Nonnull UpdateOp update) {
- for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
- Operation op = change.getValue();
- if (op.type == Operation.Type.CONTAINS_MAP_ENTRY) {
- Key k = change.getKey();
- Revision r = k.getRevision();
+ public static boolean checkConditions(@Nonnull Document doc,
+ @Nonnull Map<Key, Condition> conditions) {
+ for (Map.Entry<Key, Condition> entry : conditions.entrySet()) {
+ Condition c = entry.getValue();
+ Key k = entry.getKey();
+ Object value = doc.get(k.getName());
+ Revision r = k.getRevision();
+ if (c.type == Condition.Type.EXISTS) {
if (r == null) {
- throw new IllegalStateException("CONTAINS_MAP_ENTRY must not contain null revision");
+ throw new IllegalStateException("EXISTS must not contain null revision");
}
- Object value = doc.get(k.getName());
if (value == null) {
- if (Boolean.TRUE.equals(op.value)) {
+ if (Boolean.TRUE.equals(c.value)) {
return false;
}
} else {
if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
- if (Boolean.TRUE.equals(op.value)) {
+ if (Boolean.TRUE.equals(c.value)) {
if (!map.containsKey(r)) {
return false;
}
@@ -133,6 +134,19 @@ public class UpdateUtils {
return false;
}
}
+ } else if (c.type == Condition.Type.EQUALS) {
+ if (r != null) {
+ if (value instanceof Map) {
+ value = ((Map) value).get(r);
+ } else {
+ value = null;
+ }
+ }
+ if (!Objects.equal(value, c.value)) {
+ return false;
+ }
+ } else {
+ throw new IllegalArgumentException("Unknown condition: " + c.type);
}
}
return true;
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java Tue May 19 15:30:49 2015
@@ -39,6 +39,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
import com.google.common.base.Splitter;
@@ -46,6 +47,8 @@ import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
+import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions;
+
/**
* Emulates a MongoDB store (possibly consisting of multiple shards and
* replicas).
@@ -161,11 +164,11 @@ public class MemoryDocumentStore impleme
}
@Override
- public <T extends Document> void remove(Collection<T> collection, String path) {
+ public <T extends Document> void remove(Collection<T> collection, String key) {
Lock lock = rwLock.writeLock();
lock.lock();
try {
- getMap(collection).remove(path);
+ getMap(collection).remove(key);
} finally {
lock.unlock();
}
@@ -178,6 +181,28 @@ public class MemoryDocumentStore impleme
}
}
+ @Override
+ public <T extends Document> int remove(Collection<T> collection,
+ Map<String, Map<UpdateOp.Key, Condition>> toRemove) {
+ int num = 0;
+ ConcurrentSkipListMap<String, T> map = getMap(collection);
+ for (Map.Entry<String, Map<UpdateOp.Key, Condition>> entry : toRemove.entrySet()) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ T doc = map.get(entry.getKey());
+ if (doc != null && checkConditions(doc, entry.getValue())) {
+ if (map.remove(entry.getKey()) != null) {
+ num++;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return num;
+ }
+
@CheckForNull
@Override
public <T extends Document> T createOrUpdate(Collection<T> collection, UpdateOp update) {
@@ -230,7 +255,7 @@ public class MemoryDocumentStore impleme
} else {
oldDoc.deepCopy(doc);
}
- if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+ if (checkConditions && !checkConditions(doc, update.getConditions())) {
return null;
}
// update the document
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java Tue May 19 15:30:49 2015
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -42,6 +43,7 @@ import com.google.common.collect.Immutab
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.mongodb.MongoClientURI;
+import com.mongodb.QueryOperators;
import com.mongodb.ReadPreference;
import org.apache.jackrabbit.oak.cache.CacheStats;
@@ -56,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
@@ -101,7 +104,7 @@ public class MongoDocumentStore implemen
private static final DBObject BY_ID_ASC = new BasicDBObject(Document.ID, 1);
- static enum DocumentReadPreference {
+ enum DocumentReadPreference {
PRIMARY,
PREFER_PRIMARY,
PREFER_SECONDARY,
@@ -114,11 +117,6 @@ public class MongoDocumentStore implemen
private final DBCollection clusterNodes;
private final DBCollection settings;
- /**
- * The sum of all milliseconds this class waited for MongoDB.
- */
- private long timeSum;
-
private final Cache<CacheValue, NodeDocument> nodesCache;
private final CacheStats cacheStats;
@@ -593,12 +591,11 @@ public class MongoDocumentStore implemen
DBCollection dbCollection = getDBCollection(collection);
long start = PERFLOG.start();
try {
- WriteResult writeResult = dbCollection.remove(getByKeyQuery(key).get());
- invalidateCache(collection, key);
- if (writeResult.getError() != null) {
- throw new DocumentStoreException("Remove failed: " + writeResult.getError());
- }
+ dbCollection.remove(getByKeyQuery(key).get());
+ } catch (Exception e) {
+ throw DocumentStoreException.convert(e, "Remove failed for " + key);
} finally {
+ invalidateCache(collection, key);
PERFLOG.end(start, 1, "remove key={}", key);
}
}
@@ -607,15 +604,58 @@ public class MongoDocumentStore implemen
public <T extends Document> void remove(Collection<T> collection, List<String> keys) {
log("remove", keys);
DBCollection dbCollection = getDBCollection(collection);
- for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){
- DBObject query = QueryBuilder.start(Document.ID).in(keyBatch).get();
- WriteResult writeResult = dbCollection.remove(query);
- invalidateCache(collection, keyBatch);
- if (writeResult.getError() != null) {
- throw new DocumentStoreException("Remove failed: " + writeResult.getError());
+ long start = PERFLOG.start();
+ try {
+ for(List<String> keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){
+ DBObject query = QueryBuilder.start(Document.ID).in(keyBatch).get();
+ try {
+ dbCollection.remove(query);
+ } catch (Exception e) {
+ throw DocumentStoreException.convert(e, "Remove failed for " + keyBatch);
+ } finally {
+ invalidateCache(collection, keyBatch);
+ }
}
+ } finally {
+ PERFLOG.end(start, 1, "remove keys={}", keys);
}
+ }
+ @Override
+ public <T extends Document> int remove(Collection<T> collection,
+ Map<String, Map<Key, Condition>> toRemove) {
+ log("remove", toRemove);
+ int num = 0;
+ DBCollection dbCollection = getDBCollection(collection);
+ long start = PERFLOG.start();
+ try {
+ List<String> batchIds = Lists.newArrayList();
+ List<DBObject> batch = Lists.newArrayList();
+ Iterator<Entry<String, Map<Key, Condition>>> it = toRemove.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, Map<Key, Condition>> entry = it.next();
+ QueryBuilder query = createQueryForUpdate(
+ entry.getKey(), entry.getValue());
+ batchIds.add(entry.getKey());
+ batch.add(query.get());
+ if (!it.hasNext() || batch.size() == IN_CLAUSE_BATCH_SIZE) {
+ DBObject q = new BasicDBObject();
+ q.put(QueryOperators.OR, batch);
+ try {
+ num += dbCollection.remove(q).getN();
+ } catch (Exception e) {
+ throw DocumentStoreException.convert(e, "Remove failed for " + batch);
+ } finally {
+ invalidateCache(collection, Lists.newArrayList(batchIds));
+ }
+ batchIds.clear();
+ batch.clear();
+ }
+ }
+ } finally {
+ PERFLOG.end(start, 1, "remove keys={}", toRemove);
+ }
+ return num;
}
@CheckForNull
@@ -646,7 +686,9 @@ public class MongoDocumentStore implemen
// perform a conditional update with limited result
// if we have a matching modCount
if (modCount != null) {
- QueryBuilder query = createQueryForUpdate(updateOp, checkConditions);
+
+ QueryBuilder query = createQueryForUpdate(updateOp.getId(),
+ updateOp.getConditions());
query.and(Document.MOD_COUNT).is(modCount);
DBObject fields = new BasicDBObject();
// return _id only
@@ -665,10 +707,8 @@ public class MongoDocumentStore implemen
// conditional update failed or not possible
// perform operation and get complete document
- QueryBuilder query = createQueryForUpdate(updateOp, checkConditions);
- DBObject oldNode = dbCollection.findAndModify(query.get(), null,
- null /*sort*/, false /*remove*/, update, false /*returnNew*/,
- upsert);
+ QueryBuilder query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions());
+ DBObject oldNode = dbCollection.findAndModify(query.get(), null, null /*sort*/, false /*remove*/, update, false /*returnNew*/, upsert);
if (checkConditions && oldNode == null) {
return null;
}
@@ -740,9 +780,6 @@ public class MongoDocumentStore implemen
case REMOVE_MAP_ENTRY:
// nothing to do for new entries
break;
- case CONTAINS_MAP_ENTRY:
- // no effect
- break;
}
}
if (!inserts[i].containsField(Document.MOD_COUNT)) {
@@ -943,9 +980,6 @@ public class MongoDocumentStore implemen
@Override
public void dispose() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("MongoDB time: " + timeSum);
- }
nodes.getDB().getMongo().close();
if (nodesCache instanceof Closeable) {
@@ -1112,22 +1146,23 @@ public class MongoDocumentStore implemen
}
@Nonnull
- private static QueryBuilder createQueryForUpdate(UpdateOp updateOp,
- boolean checkConditions) {
- QueryBuilder query = getByKeyQuery(updateOp.getId());
+ private static QueryBuilder createQueryForUpdate(String key,
+ Map<Key, Condition> conditions) {
+ QueryBuilder query = getByKeyQuery(key);
- for (Entry<Key, Operation> entry : updateOp.getChanges().entrySet()) {
+ for (Entry<Key, Condition> entry : conditions.entrySet()) {
Key k = entry.getKey();
- Operation op = entry.getValue();
- switch (op.type) {
- case CONTAINS_MAP_ENTRY: {
- if (checkConditions) {
- query.and(k.toString()).exists(op.value);
- }
+ Condition c = entry.getValue();
+ switch (c.type) {
+ case EXISTS:
+ query.and(k.toString()).exists(c.value);
+ break;
+ case EQUALS:
+ query.and(k.toString()).is(c.value);
break;
- }
}
}
+
return query;
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentSerializer.java Tue May 19 15:30:49 2015
@@ -102,10 +102,6 @@ public class RDBDocumentSerializer {
if (columnProperties.contains(key.getName()) && null == key.getRevision())
continue;
- // already checked
- if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY)
- continue;
-
if (needComma) {
sb.append(",");
}
@@ -177,7 +173,7 @@ public class RDBDocumentSerializer {
}
/**
- * Reconstructs a {@link Document) based on the persisted {@link DBRow}.
+ * Reconstructs a {@link Document) based on the persisted {@link RDBRow}.
*/
public <T extends Document> T fromRow(@Nonnull Collection<T> collection, @Nonnull RDBRow row) throws DocumentStoreException {
T doc = collection.newDocument(store);
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue May 19 15:30:49 2015
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document.rdb;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -35,6 +36,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -60,6 +62,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.Revision;
import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
@@ -73,10 +76,11 @@ import com.google.common.base.Objects;
import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
/**
- * Implementation of {@link CachingDocumentStore} for relational databases.
+ * Implementation of {@link DocumentStore} for relational databases.
*
* <h3>Supported Databases</h3>
* <p>
@@ -242,6 +246,16 @@ public class RDBDocumentStore implements
}
@Override
+ public <T extends Document> int remove(Collection<T> collection,
+ Map<String, Map<Key, Condition>> toRemove) {
+ int num = delete(collection, toRemove);
+ for (String id : toRemove.keySet()) {
+ invalidateCache(collection, id, true);
+ }
+ return num;
+ }
+
+ @Override
public <T extends Document> boolean create(Collection<T> collection, List<UpdateOp> updateOps) {
return internalCreate(collection, updateOps);
}
@@ -1005,7 +1019,7 @@ public class RDBDocumentStore implements
throw new DocumentStoreException("Document does not exist: " + update.getId());
}
T doc = collection.newDocument(this);
- if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+ if (checkConditions && !checkConditions(doc, update.getConditions())) {
return null;
}
update.increment(MODCOUNT, 1);
@@ -1105,7 +1119,7 @@ public class RDBDocumentStore implements
private <T extends Document> T applyChanges(Collection<T> collection, T oldDoc, UpdateOp update, boolean checkConditions) {
T doc = collection.newDocument(this);
oldDoc.deepCopy(doc);
- if (checkConditions && !UpdateUtils.checkConditions(doc, update)) {
+ if (checkConditions && !checkConditions(doc, update.getConditions())) {
return null;
}
if (hasChangesToCollisions(update)) {
@@ -1263,13 +1277,14 @@ public class RDBDocumentStore implements
}
}
- private <T extends Document> void delete(Collection<T> collection, List<String> ids) {
+ private <T extends Document> int delete(Collection<T> collection, List<String> ids) {
+ int numDeleted = 0;
for (List<String> sublist : Lists.partition(ids, 64)) {
Connection connection = null;
String tableName = getTable(collection);
try {
connection = this.ch.getRWConnection();
- dbDelete(connection, tableName, sublist);
+ numDeleted += dbDelete(connection, tableName, sublist);
connection.commit();
} catch (Exception ex) {
throw new DocumentStoreException(ex);
@@ -1277,6 +1292,33 @@ public class RDBDocumentStore implements
this.ch.closeConnection(connection);
}
}
+ return numDeleted;
+ }
+
+ private <T extends Document> int delete(Collection<T> collection,
+ Map<String, Map<Key, Condition>> toRemove) {
+ int numDeleted = 0;
+ String tableName = getTable(collection);
+ Map<String, Map<Key, Condition>> subMap = Maps.newHashMap();
+ Iterator<Entry<String, Map<Key, Condition>>> it = toRemove.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, Map<Key, Condition>> entry = it.next();
+ subMap.put(entry.getKey(), entry.getValue());
+ if (subMap.size() == 64 || !it.hasNext()) {
+ Connection connection = null;
+ try {
+ connection = this.ch.getRWConnection();
+ numDeleted += dbDelete(connection, tableName, subMap);
+ connection.commit();
+ } catch (Exception ex) {
+ throw DocumentStoreException.convert(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ subMap.clear();
+ }
+ }
+ return numDeleted;
}
private <T extends Document> boolean updateDocument(@Nonnull Collection<T> collection, @Nonnull T document,
@@ -1346,12 +1388,7 @@ public class RDBDocumentStore implements
* state
*/
private static boolean requiresPreviousState(UpdateOp update) {
- for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
- Operation op = change.getValue();
- if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY)
- return true;
- }
- return false;
+ return !update.getConditions().isEmpty();
}
private static long getModifiedFromUpdate(UpdateOp update) {
@@ -1735,7 +1772,7 @@ public class RDBDocumentStore implements
}
}
- private void dbDelete(Connection connection, String tableName, List<String> ids) throws SQLException {
+ private int dbDelete(Connection connection, String tableName, List<String> ids) throws SQLException {
PreparedStatement stmt;
int cnt = ids.size();
@@ -1761,6 +1798,52 @@ public class RDBDocumentStore implements
if (result != cnt) {
LOG.debug("DB delete failed for " + tableName + "/" + ids);
}
+ return result;
+ } finally {
+ stmt.close();
+ }
+ }
+
+ private int dbDelete(Connection connection, String tableName,
+ Map<String, Map<Key, Condition>> toDelete)
+ throws SQLException, DocumentStoreException {
+ String or = "";
+ StringBuilder whereClause = new StringBuilder();
+ for (Entry<String, Map<Key, Condition>> entry : toDelete.entrySet()) {
+ whereClause.append(or);
+ or = " or ";
+ whereClause.append("ID=?");
+ for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
+ if (!c.getKey().getName().equals(MODIFIED)) {
+ throw new DocumentStoreException(
+ "Unsupported condition: " + c);
+ }
+ whereClause.append(" and MODIFIED");
+ if (c.getValue().type == Condition.Type.EQUALS
+ && c.getValue().value instanceof Long) {
+ whereClause.append("=?");
+ } else if (c.getValue().type == Condition.Type.EXISTS) {
+ whereClause.append(" is not null");
+ } else {
+ throw new DocumentStoreException(
+ "Unsupported condition: " + c);
+ }
+ }
+ }
+
+ PreparedStatement stmt= connection.prepareStatement(
+ "delete from " + tableName + " where " + whereClause);
+ try {
+ int i = 1;
+ for (Entry<String, Map<Key, Condition>> entry : toDelete.entrySet()) {
+ setIdInStatement(stmt, i++, entry.getKey());
+ for (Entry<Key, Condition> c : entry.getValue().entrySet()) {
+ if (c.getValue().type == Condition.Type.EQUALS) {
+ stmt.setLong(i++, (Long) c.getValue().value);
+ }
+ }
+ }
+ return stmt.executeUpdate();
} finally {
stmt.close();
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java Tue May 19 15:30:49 2015
@@ -144,9 +144,29 @@ public class LoggingDocumentStoreWrapper
@Override
public <T extends Document> void remove(Collection<T> collection, List<String> keys) {
- //TODO Logging
- for(String key : keys){
- remove(collection, key);
+ try {
+ logMethod("remove", collection, keys);
+ store.remove(collection, keys);
+ } catch (Exception e) {
+ logException(e);
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public <T extends Document> int remove(final Collection<T> collection,
+ final Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove) {
+ try {
+ logMethod("remove", collection, toRemove);
+ return logResult(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ return store.remove(collection, toRemove);
+ }
+ });
+ } catch (Exception e) {
+ logException(e);
+ throw convert(e);
}
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java Tue May 19 15:30:49 2015
@@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.plugin
import java.util.List;
import java.util.Map;
+import javax.annotation.Nonnull;
+
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.Document;
@@ -49,12 +51,14 @@ public class SynchronizingDocumentStoreW
}
@Override
+ @Nonnull
public synchronized <T extends Document> List<T> query(final Collection<T> collection, final String fromKey,
final String toKey, final int limit) {
return store.query(collection, fromKey, toKey, limit);
}
@Override
+ @Nonnull
public synchronized <T extends Document> List<T> query(final Collection<T> collection, final String fromKey,
final String toKey, final String indexedProperty, final long startValue, final int limit) {
return store.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
@@ -67,9 +71,13 @@ public class SynchronizingDocumentStoreW
@Override
public synchronized <T extends Document> void remove(Collection<T> collection, List<String> keys) {
- for(String key : keys){
- remove(collection, key);
- }
+ store.remove(collection, keys);
+ }
+
+ @Override
+ public synchronized <T extends Document> int remove(Collection<T> collection,
+ Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove) {
+ return store.remove(collection, toRemove);
}
@Override
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java Tue May 19 15:30:49 2015
@@ -180,9 +180,31 @@ public class TimingDocumentStoreWrapper
@Override
public <T extends Document> void remove(Collection<T> collection, List<String> keys) {
- //TODO Timing
- for(String key : keys){
- remove(collection, key);
+ try {
+ long start = now();
+ base.remove(collection, keys);
+ updateAndLogTimes("remove", start, 0, 0);
+ if (logCommonCall()) {
+ logCommonCall(start, "remove " + collection + " " + keys);
+ }
+ } catch (Exception e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public <T extends Document> int remove(Collection<T> collection,
+ Map<String, Map<UpdateOp.Key, UpdateOp.Condition>> toRemove) {
+ try {
+ long start = now();
+ int result = base.remove(collection, toRemove);
+ updateAndLogTimes("remove", start, 0, 0);
+ if (logCommonCall()) {
+ logCommonCall(start, "remove " + collection + " " + toRemove);
+ }
+ return result;
+ } catch (Exception e) {
+ throw convert(e);
}
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1680315&r1=1680314&r2=1680315&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java Tue May 19 15:30:49 2015
@@ -37,6 +37,12 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -428,8 +434,7 @@ public class BasicDocumentStoreTest exte
cnt += 1;
}
- LOG.info("document creation with property of size " + size + " and batch size " + amount + " for " + super.dsname + " was "
- + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
+ LOG.info("document creation with property of size " + size + " and batch size " + amount + " for " + super.dsname + " was " + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
}
@Test
@@ -644,8 +649,7 @@ public class BasicDocumentStoreTest exte
assertTrue(m.keySet().equals(expectedRevs));
}
- LOG.info("document updates with property of size " + size + (growing ? " (growing)" : "") + " for " + super.dsname
- + " was " + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
+ LOG.info("document updates with property of size " + size + (growing ? " (growing)" : "") + " for " + super.dsname + " was " + cnt + " in " + duration + "ms (" + (cnt / (duration / 1000f)) + "/s)");
}
private static String generateString(int length, boolean ascii) {
@@ -985,4 +989,53 @@ public class BasicDocumentStoreTest exte
Map<String, String> desc = ds.getMetadata();
assertNotNull(desc.get("type"));
}
+
+ @Test
+ public void removeWithCondition() throws Exception {
+ List<UpdateOp> docs = Lists.newArrayList();
+ docs.add(newDocument("/foo", 100));
+ removeMe.add(Utils.getIdFromPath("/foo"));
+ docs.add(newDocument("/bar", 200));
+ removeMe.add(Utils.getIdFromPath("/bar"));
+ docs.add(newDocument("/baz", 300));
+ removeMe.add(Utils.getIdFromPath("/baz"));
+ ds.create(Collection.NODES, docs);
+
+ for (UpdateOp op : docs) {
+ assertNotNull(ds.find(Collection.NODES, op.getId()));
+ }
+
+ Map<String, Map<Key, Condition>> toRemove = Maps.newHashMap();
+ removeDocument(toRemove, "/foo", 100); // matches
+ removeDocument(toRemove, "/bar", 300); // modified differs
+ removeDocument(toRemove, "/qux", 100); // does not exist
+ removeDocument(toRemove, "/baz", 300); // matches
+
+ int removed = ds.remove(Collection.NODES, toRemove);
+
+ assertEquals(2, removed);
+ assertNotNull(ds.find(Collection.NODES, Utils.getIdFromPath("/bar")));
+ for (NodeDocument doc : Utils.getAllDocuments(ds)) {
+ if (!doc.getPath().equals("/bar")) {
+ fail("document must not exist: " + doc.getId());
+ }
+ }
+ }
+
+ private UpdateOp newDocument(String path, long modified) {
+ String id = Utils.getIdFromPath(path);
+ UpdateOp op = new UpdateOp(id, true);
+ op.set(NodeDocument.MODIFIED_IN_SECS, modified);
+ op.set(Document.ID, id);
+ return op;
+ }
+
+ private void removeDocument(Map<String, Map<Key, Condition>> toRemove,
+ String path,
+ long modified) {
+ toRemove.put(Utils.getIdFromPath(path),
+ Collections.singletonMap(
+ new Key(NodeDocument.MODIFIED_IN_SECS, null),
+ Condition.newEqualsCondition(modified)));
+ }
}