You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2014/10/14 19:20:46 UTC

svn commit: r1631824 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java

Author: reschke
Date: Tue Oct 14 17:20:45 2014
New Revision: 1631824

URL: http://svn.apache.org/r1631824
Log:
OAK-1941 - implement batched append update for update operations that do not require checking the current document state; fix MySQL issues with append mode

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1631824&r1=1631823&r2=1631824&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue Oct 14 17:20:45 2014
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -65,7 +66,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation;
 import org.apache.jackrabbit.oak.plugins.document.UpdateUtils;
 import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
-import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
@@ -348,6 +348,9 @@ public class RDBDocumentStore implements
     // number of retries for updates
     private static int RETRIES = 10;
 
+    // for DBs that prefer "concat" over "||"
+    private boolean needsConcat = false;
+
     // set of supported indexed properties
     private static Set<String> INDEXEDPROPERTIES = new HashSet<String>(Arrays.asList(new String[] { MODIFIED,
             NodeDocument.HAS_BINARY_FLAG }));
@@ -378,6 +381,8 @@ public class RDBDocumentStore implements
             stmt.execute("ALTER SESSION SET NLS_SORT='BINARY'");
             stmt.close();
             con.commit();
+        } else if ("MySQL".equals(dbtype)) {
+            this.needsConcat = true;
         }
 
         try {
@@ -655,10 +660,56 @@ public class RDBDocumentStore implements
     @CheckForNull
     private <T extends Document> void internalUpdate(Collection<T> collection, List<String> ids, UpdateOp update) {
 
-        for (String id : ids) {
-            UpdateOp up = update.copy();
-            up = up.shallowCopy(id);
-            internalCreateOrUpdate(collection, up, false, true);
+        if (isAppendableUpdate(update) && !requiresPreviousState(update)) {
+            long modified = getModifiedFromUpdate(update);
+
+            for (List<String> chunkedIds : Lists.partition(ids, CHUNKSIZE)) {
+                // remember what we already have in the cache
+                Map<String, NodeDocument> cachedDocs = Collections.emptyMap();
+                if (collection == Collection.NODES) {
+                    cachedDocs = new HashMap<String, NodeDocument>();
+                    for (String key : chunkedIds) {
+                        cachedDocs.put(key, nodesCache.getIfPresent(new StringValue(key)));
+                    }
+                }
+
+                Connection connection = null;
+                String tableName = getTable(collection);
+                boolean success = false;
+                try {
+                    connection = getConnection();
+                    success = dbBatchedAppendingUpdate(connection, tableName, chunkedIds, modified, asString(update));
+                    connection.commit();
+                } catch (SQLException ex) {
+                    success = false;
+                } finally {
+                    closeConnection(connection);
+                }
+                if (success) {
+                    for (Entry<String, NodeDocument> entry : cachedDocs.entrySet()) {
+                            if (entry.getValue() == null) {
+                                // make sure concurrently loaded document is invalidated
+                                nodesCache.invalidate(new StringValue(entry.getKey()));
+                            } else {
+                                T oldDoc = (T)(entry.getValue());
+                                T newDoc = applyChanges(collection, (T)(entry.getValue()), update, true);
+                                applyToCache((NodeDocument) oldDoc, (NodeDocument) newDoc);
+                            }
+                    }
+                } else {
+                    for (String id : chunkedIds) {
+                        UpdateOp up = update.copy();
+                        up = up.shallowCopy(id);
+                        internalCreateOrUpdate(collection, up, false, true);
+                    }
+                }
+            }
+        } else {
+            for (String id : ids) {
+                UpdateOp up = update.copy();
+                up = up.shallowCopy(id);
+                internalCreateOrUpdate(collection, up, false, true);
+            }
         }
     }
 
@@ -934,6 +985,27 @@ public class RDBDocumentStore implements
         return true;
     }
 
+    /* check whether this update operation requires knowledge about the previous state */
+    private static boolean requiresPreviousState(UpdateOp update) {
+        for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
+            Operation op = change.getValue();
+            if (op.type == UpdateOp.Operation.Type.CONTAINS_MAP_ENTRY) return true;
+        }
+        return false;
+    }
+
+    private static long  getModifiedFromUpdate(UpdateOp update) {
+        for (Map.Entry<Key, Operation> change : update.getChanges().entrySet()) {
+            Operation op = change.getValue();
+            if (op.type == UpdateOp.Operation.Type.MAX || op.type == UpdateOp.Operation.Type.SET) {
+                if (MODIFIED.equals(change.getKey().getName())) {
+                    return Long.parseLong(op.value.toString());
+                }
+            }
+        }
+        return 0L;
+    }
+
     /**
      * Serializes the changes in the {@link UpdateOp} into a JSON array; each entry is another
      * JSON array holding operation, key, revision, and value.
@@ -1183,7 +1255,9 @@ public class RDBDocumentStore implements
 
     private boolean dbAppendingUpdate(Connection connection, String tableName, String id, Long modified, Boolean hasBinary, Long modcount, Long cmodcount, Long oldmodcount,
             String appendData) throws SQLException {
-        String t = "update " + tableName + " set MODIFIED = ?, HASBINARY = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = DSIZE + ?, DATA = DATA || ? where ID = ?";
+        String t = "update " + tableName + " set MODIFIED = ?, HASBINARY = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = DSIZE + ?, ";
+        t += (this.needsConcat ? "DATA = CONCAT(DATA, ?) " : "DATA = DATA || ? ");
+        t += "where ID = ?";
         if (oldmodcount != null) {
             t += " and MODCOUNT = ?";
         }
@@ -1211,6 +1285,39 @@ public class RDBDocumentStore implements
         }
     }
 
+    private boolean dbBatchedAppendingUpdate(Connection connection, String tableName, List<String> ids, Long modified, String appendData) throws SQLException {
+        StringBuilder t = new StringBuilder();
+        t.append("update " + tableName + " set MODIFIED = ?, MODCOUNT = MODCOUNT + 1, DSIZE = DSIZE + ?, ");
+        t.append(this.needsConcat ? "DATA = CONCAT(DATA, ?)" : "DATA = DATA || ? ");
+        t.append("where ID in (");
+        for (int i = 0; i < ids.size(); i++) {
+            if (i != 0) {
+                t.append(',');
+            }
+            t.append('?');
+        }
+        t.append(") and MODIFIED <= ?");
+        PreparedStatement stmt = connection.prepareStatement(t.toString());
+        try {
+            int si = 1;
+            stmt.setObject(si++, modified, Types.BIGINT);
+            stmt.setObject(si++, 1 + appendData.length(), Types.BIGINT);
+            stmt.setString(si++, "," + appendData);
+            for (String id : ids) {
+                stmt.setString(si++, id);
+            }
+            stmt.setObject(si++, modified, Types.BIGINT);
+            int result = stmt.executeUpdate();
+            if (result != ids.size()) {
+                LOG.debug("DB update failed for " + tableName + "/" + ids);
+            }
+            return result == ids.size();
+        } 
+        finally {
+            stmt.close();
+        }
+    }
+
     private boolean dbInsert(Connection connection, String tableName, String id, Long modified, Boolean hasBinary, Long modcount,
             Long cmodcount, String data) throws SQLException {
         PreparedStatement stmt = connection.prepareStatement("insert into " + tableName

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java?rev=1631824&r1=1631823&r2=1631824&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java Tue Oct 14 17:20:45 2014
@@ -240,6 +240,7 @@ public class BasicDocumentStoreTest exte
     public void testUpdateMultiple() {
         String id = this.getClass().getName() + ".testUpdateMultiple";
         // create a test node
+        super.ds.remove(Collection.NODES, id);
         UpdateOp up = new UpdateOp(id, true);
         up.set("_id", id);
         boolean success = super.ds.create(Collection.NODES, Collections.singletonList(up));
@@ -405,7 +406,6 @@ public class BasicDocumentStoreTest exte
         String bid = this.getClass().getName() + ".testPerfLastRevBatch";
         int nodecount = 100;
         long duration = 5000;
-        long end = System.currentTimeMillis() + duration;
         int cnt = 0;
         List<String> ids = new ArrayList<String>();
         Revision cr = Revision.fromString("r0-0-1");
@@ -424,6 +424,7 @@ public class BasicDocumentStoreTest exte
             ids.add(id);
         }
 
+        long end = System.currentTimeMillis() + duration;
         while (System.currentTimeMillis() < end) {
             UpdateOp up = new UpdateOp(bid, true);
             up.setMapEntry("_lastRev", cr, "iteration-" + cnt);