You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2014/10/14 19:20:46 UTC
svn commit: r1631824 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
Author: reschke
Date: Tue Oct 14 17:20:45 2014
New Revision: 1631824
URL: http://svn.apache.org/r1631824
Log:
OAK-1941 - implement batched append update for update operations that do not require checking the current document state; fix MySQL issues with append mode
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1631824&r1=1631823&r2=1631824&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue Oct 14 17:20:45 2014
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -65,7 +66,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
-import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -348,6 +348,9 @@ public class RDBDocumentStore implements
// number of retries for updates
private static int RETRIES = 10;
+ // for DBs that prefer "concat" over "||"
+ private boolean needsConcat = false;
+
// set of supported indexed properties
private static Set<String> INDEXEDPROPERTIES = new HashSet<String>(Arrays.asList(new String[] { MODIFIED,
NodeDocument.HAS_BINARY_FLAG }));
@@ -378,6 +381,8 @@ public class RDBDocumentStore implements
stmt.execute("ALTER SESSION SET NLS_SORT='BINARY'");
stmt.close();
con.commit();
+ } else if ("MySQL".equals(dbtype)) {
+ this.needsConcat = true;
}
try {
@@ -655,10 +660,56 @@ public class RDBDocumentStore implements
@CheckForNull
private <T extends Document> void internalUpdate(Collection<T> collection, List<String> ids, UpdateOp update) {
- for (String id : ids) {
- UpdateOp up = update.copy();
- up = up.shallowCopy(id);
- internalCreateOrUpdate(collection, up, false, true);
+ if (isAppendableUpdate(update) && !requiresPreviousState(update)) {
+ long modified = getModifiedFromUpdate(update);
+
+ for (List<String> chunkedIds : Lists.partition(ids, CHUNKSIZE)) {
+ // remember what we already have in the cache
+ Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
+ if (collection == Collection.NODES) {
+ cachedDocs = new HashMap<String, NodeDocument>();
+ for (String key : chunkedIds) {
+ cachedDocs.put(key, nodesCache.getIfPresent(new StringValue(key)));
+ }
+ }
+
+ Connection connection = null;
+ String tableName = getTable(collection);
+ boolean success = false;
+ try {
+ connection = getConnection();
+ success = dbBatchedAppendingUpdate(connection, tableName, chunkedIds, modified, asString(update));
+ connection.commit();
+ } catch (SQLException ex) {
+ success = false;
+ } finally {
+ closeConnection(connection);
+ }
+ if (success) {
+ for (Entry<String, NodeDocument> entry : cachedDocs.entrySet()) {
+ if (entry.getValue() == null) {
+ // make sure concurrently loaded document is invalidated
+ nodesCache.invalidate(new StringValue(entry.getKey()));
+ } else {
+ T oldDoc = (T)(entry.getValue());
+ T newDoc = applyChanges(collection, (T)(entry.getValue()), update, true);
+ applyToCache((NodeDocument) oldDoc, (NodeDocument) newDoc);
+ }
+ }
+ } else {
+ for (String id : chunkedIds) {
+ UpdateOp up = update.copy();
+ up = up.shallowCopy(id);
+ internalCreateOrUpdate(collection, up, false, true);
+ }
+ }
+ }
+ } else {
+ for (String id : ids) {
+ UpdateOp up = update.copy();
+ up = up.shallowCopy(id);
+ internalCreateOrUpdate(collection, up, false, true);
+ }
}
}
@@ -934,6 +985,27 @@ public class RDBDocumentStore implements
return true;
}
+ /* check whether this update operation requires knowledge about the previous state */
+ private static boolean requiresPreviousState(UpdateOp update) {
+ for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
+ Operation op = change.getValue();
+ if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY) return true;
+ }
+ return false;
+ }
+
+ private static long getModifiedFromUpdate(UpdateOp update) {
+ for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
+ Operation op = change.getValue();
+ if (op.type == UpdateOp.Operation.Type.MAX || op.type == UpdateOp.Operation.Type.SET) {
+ if (MODIFIED.equals(change.getKey().getName())) {
+ return Long.parseLong(op.value.toString());
+ }
+ }
+ }
+ return 0L;
+ }
+
/**
* Serializes the changes in the {@link UpdateOp} into a JSON array; each entry is another
* JSON array holding operation, key, revision, and value.
@@ -1183,7 +1255,9 @@ public class RDBDocumentStore implements
private boolean dbAppendingUpdate(Connection connection, String tableName, String id, Long modified, Boolean hasBinary, Long modcount, Long cmodcount, Long oldmodcount,
String appendData) throws SQLException {
- String t = "update " + tableName + " set MODIFIED = ?, HASBINARY = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = DSIZE + ?, DATA = DATA || ? where ID = ?";
+ String t = "update " + tableName + " set MODIFIED = ?, HASBINARY = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = DSIZE + ?, ";
+ t += (this.needsConcat ? "DATA = CONCAT(DATA, ?) " : "DATA = DATA || ? ");
+ t += "where ID = ?";
if (oldmodcount != null) {
t += " and MODCOUNT = ?";
}
@@ -1211,6 +1285,39 @@ public class RDBDocumentStore implements
}
}
+ private boolean dbBatchedAppendingUpdate(Connection connection, String tableName, List<String> ids, Long modified, String appendData) throws SQLException {
+ StringBuilder t = new StringBuilder();
+ t.append("update " + tableName + " set MODIFIED = ?, MODCOUNT = MODCOUNT + 1, DSIZE = DSIZE + ?, ");
+ t.append(this.needsConcat ? "DATA = CONCAT(DATA, ?)" : "DATA = DATA || ? ");
+ t.append("where ID in (");
+ for (int i = 0; i < ids.size(); i++) {
+ if (i != 0) {
+ t.append(',');
+ }
+ t.append('?');
+ }
+ t.append(") and MODIFIED <= ?");
+ PreparedStatement stmt = connection.prepareStatement(t.toString());
+ try {
+ int si = 1;
+ stmt.setObject(si++, modified, Types.BIGINT);
+ stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+ stmt.setString(si++, "," + appendData);
+ for (String id : ids) {
+ stmt.setString(si++, id);
+ }
+ stmt.setObject(si++, modified, Types.BIGINT);
+ int result = stmt.executeUpdate();
+ if (result != ids.size()) {
+ LOG.debug("DB update failed for " + tableName + "/" + ids);
+ }
+ return result == ids.size();
+ }
+ finally {
+ stmt.close();
+ }
+ }
+
private boolean dbInsert(Connection connection, String tableName, String id, Long modified, Boolean hasBinary, Long modcount,
Long cmodcount, String data) throws SQLException {
PreparedStatement stmt = connection.prepareStatement("insert into " + tableName
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1631824&r1=1631823&r2=1631824&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java Tue Oct 14 17:20:45 2014
@@ -240,6 +240,7 @@ public class BasicDocumentStoreTest exte
public void testUpdateMultiple() {
String id = this.getClass().getName() + ".testUpdateMultiple";
// create a test node
+ super.ds.remove(Collection.NODES, id);
UpdateOp up = new UpdateOp(id, true);
up.set("_id", id);
boolean success = super.ds.create(Collection.NODES, Collections.singletonList(up));
@@ -405,7 +406,6 @@ public class BasicDocumentStoreTest exte
String bid = this.getClass().getName() + ".testPerfLastRevBatch";
int nodecount = 100;
long duration = 5000;
- long end = System.currentTimeMillis() + duration;
int cnt = 0;
List<String> ids = new ArrayList<String>();
Revision cr = Revision.fromString("r0-0-1");
@@ -424,6 +424,7 @@ public class BasicDocumentStoreTest exte
ids.add(id);
}
+ long end = System.currentTimeMillis() + duration;
while (System.currentTimeMillis() < end) {
UpdateOp up = new UpdateOp(bid, true);
up.setMapEntry("_lastRev", cr, "iteration-" + cnt);