You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2016/01/05 08:36:07 UTC

svn commit: r1723008 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/rdb/ test/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/document/rdb/

Author: reschke
Date: Tue Jan  5 07:36:06 2016
New Revision: 1723008

URL: http://svn.apache.org/viewvc?rev=1723008&view=rev
Log:
OAK-3637: Bulk document updates in RDBDocumentStore

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue Jan  5 07:36:06 2016
@@ -17,6 +17,8 @@
 package org.apache.jackrabbit.oak.plugins.document.rdb;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.partition;
 import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions;
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet;
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement;
@@ -40,6 +42,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -81,6 +84,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.hash.BloomFilter;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.PrimitiveSink;
@@ -290,13 +294,172 @@ public class RDBDocumentStore implements
 
     @Override
     public <T extends Document> List<T> createOrUpdate(Collection<T> collection, List<UpdateOp> updateOps) {
-        List<T> result = new ArrayList<T>(updateOps.size());
-        for (UpdateOp update : updateOps) {
-            result.add(createOrUpdate(collection, update));
+        List<T> result = null;
+        Map<String, UpdateOp> operationsToCover = new LinkedHashMap<String, UpdateOp>();
+
+        for (UpdateOp updateOp : updateOps) {
+            UpdateUtils.assertUnconditional(updateOp);
+            UpdateOp clone = updateOp.copy();
+            addUpdateCounters(clone);
+            operationsToCover.put(clone.getId(), clone);
+        }
+
+        Map<String, T> oldDocs = new HashMap<String, T>();
+        if (collection == Collection.NODES) {
+            oldDocs.putAll((Map<String, T>) readDocumentCached(collection, operationsToCover.keySet()));
+        }
+
+        int i = 0; // iteration count
+
+        // bulk update requires two DB requests, so if we have <= 2 operations
+        // it's better to send them sequentially
+        while (operationsToCover.size() > 2) {
+            // We should try to insert documents only during the first
+            // iteration. In the 2nd and 3rd iterations we only deal with
+            // conflicting documents, so they already exist in the database
+            // and there's no point in inserting them.
+            boolean upsert = i == 0;
+
+            if (i++ == 3) {
+                // operations that conflicted in 3 consecutive bulk
+                // updates should be applied sequentially
+                break;
+            }
+
+            for (List<UpdateOp> partition : partition(newArrayList(operationsToCover.values()), CHUNKSIZE)) {
+                Set<String> successfulUpdates = bulkUpdate(collection, partition, oldDocs, upsert);
+                operationsToCover.keySet().removeAll(successfulUpdates);
+            }
+        }
+
+        // if there are some changes left, we'll apply them one after another
+        for (UpdateOp updateOp : updateOps) {
+            if (operationsToCover.remove(updateOp.getId()) != null) {
+                // work on the original update operation
+                T oldDoc = createOrUpdate(collection, updateOp.copy()); 
+                if (oldDoc != null) {
+                    oldDocs.put(oldDoc.getId(), oldDoc);
+                }
+            }
         }
+
+        result = new ArrayList<T>(updateOps.size());
+        for (UpdateOp op : updateOps) {
+            result.add(oldDocs.get(op.getId()));
+        }
+
         return result;
     }
 
+    private <T extends Document> Map<String, T> readDocumentCached(Collection<T> collection, Set<String> keys) {
+        Map<String, T> documents = new HashMap<String, T>();
+
+        if (collection == Collection.NODES) {
+            for (String key : keys) {
+                NodeDocument cached = nodesCache.getIfPresent(key);
+                if (cached != null && cached != NodeDocument.NULL) {
+                    T doc = castAsT(unwrap(cached));
+                    documents.put(doc.getId(), doc);
+                }
+            }
+        }
+
+        Set<String> documentsToRead = Sets.difference(keys, documents.keySet());
+        Map<String, T> readDocuments = readDocumentsUncached(collection, documentsToRead);
+        documents.putAll(readDocuments);
+
+        if (collection == Collection.NODES) {
+            for (T doc : readDocuments.values()) {
+                nodesCache.putIfAbsent((NodeDocument) doc);
+            }
+        }
+
+        return documents;
+    }
+
+    private <T extends Document> Map<String, T> readDocumentsUncached(Collection<T> collection, Set<String> keys) {
+        Map<String, T> result = new HashMap<String, T>();
+
+        Connection connection = null;
+        RDBTableMetaData tmd = getTable(collection);
+        try {
+            connection = this.ch.getROConnection();
+            List<RDBRow> rows = db.read(connection, tmd, keys);
+
+            int size = rows.size();
+            for (int i = 0; i < size; i++) {
+                RDBRow row = rows.set(i, null);
+                T document = convertFromDBObject(collection, row);
+                result.put(document.getId(), document);
+            }
+            connection.commit();
+        } catch (Exception ex) {
+            throw new DocumentStoreException(ex);
+        } finally {
+            this.ch.closeConnection(connection);
+        }
+        return result;
+    }
+
+    private <T extends Document> Set<String> bulkUpdate(Collection<T> collection, List<UpdateOp> updates, Map<String, T> oldDocs, boolean upsert) {
+        Set<String> missingDocs = new HashSet<String>();
+        for (UpdateOp op : updates) {
+            if (!oldDocs.containsKey(op.getId())) {
+                missingDocs.add(op.getId());
+            }
+        }
+        for (T doc : readDocumentsUncached(collection, missingDocs).values()) {
+            oldDocs.put(doc.getId(), doc);
+            if (collection == Collection.NODES) {
+                nodesCache.putIfAbsent((NodeDocument) doc);
+            }
+        }
+
+        List<T> docsToUpdate = new ArrayList<T>(updates.size());
+        Set<String> keysToUpdate = new HashSet<String>();
+        for (UpdateOp update : updates) {
+            String id = update.getId();
+            T modifiedDoc = collection.newDocument(this);
+            if (oldDocs.containsKey(id)) {
+                oldDocs.get(id).deepCopy(modifiedDoc);
+            }
+            UpdateUtils.applyChanges(modifiedDoc, update);
+            docsToUpdate.add(modifiedDoc);
+            keysToUpdate.add(id);
+        }
+
+        Connection connection = null;
+        RDBTableMetaData tmd = getTable(collection);
+        try {
+            connection = this.ch.getRWConnection();
+            Set<String> successfulUpdates = db.update(connection, tmd, docsToUpdate, upsert);
+            connection.commit();
+
+            Set<String> failedUpdates = Sets.difference(keysToUpdate, successfulUpdates);
+            oldDocs.keySet().removeAll(failedUpdates);
+
+            if (collection == Collection.NODES) {
+                for (T doc : docsToUpdate) {
+                    String id = doc.getId();
+                    if (successfulUpdates.contains(id)) {
+                        if (oldDocs.containsKey(id)) {
+                            nodesCache.replaceCachedDocument((NodeDocument) oldDocs.get(id), (NodeDocument) doc);
+                        } else {
+                            nodesCache.putIfAbsent((NodeDocument) doc);
+                        }
+                    }
+                }
+            }
+
+            return successfulUpdates;
+        } catch (SQLException ex) {
+            this.ch.rollbackConnection(connection);
+            throw new DocumentStoreException(ex);
+        } finally {
+            this.ch.closeConnection(connection);
+        }
+    }
+
     @Override
     public <T extends Document> T findAndUpdate(Collection<T> collection, UpdateOp update) {
         return internalCreateOrUpdate(collection, update, false, true);
@@ -1463,9 +1626,9 @@ public class RDBDocumentStore implements
         RDBTableMetaData tmd = getTable(collection);
         try {
             connection = this.ch.getRWConnection();
-            boolean result = db.insert(connection, tmd, documents);
+            Set<String> insertedKeys = db.insert(connection, tmd, documents);
             connection.commit();
-            return result;
+            return insertedKeys.size() == documents.size();
         } catch (SQLException ex) {
             this.ch.rollbackConnection(connection);
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java Tue Jan  5 07:36:06 2016
@@ -29,9 +29,11 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -52,6 +54,9 @@ import org.apache.jackrabbit.oak.plugins
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
 /**
  * Implements (most) DB interactions used in {@link RDBDocumentStore}.
  */
@@ -150,7 +155,6 @@ public class RDBDocumentStoreJDBC {
 
     public int delete(Connection connection, RDBTableMetaData tmd, List<String> ids) throws SQLException {
         PreparedStatement stmt;
-        int cnt = ids.size();
         PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", ids, tmd.isIdBinary());
         String sql = "delete from " + tmd.getName() + " where " + inClause.getStatementComponent();
         stmt = connection.prepareStatement(sql);
@@ -158,7 +162,7 @@ public class RDBDocumentStoreJDBC {
         try {
             inClause.setParameters(stmt, 1);
             int result = stmt.executeUpdate();
-            if (result != cnt) {
+            if (result != ids.size()) {
                 LOG.debug("DB delete failed for " + tmd.getName() + "/" + ids);
             }
             return result;
@@ -256,7 +260,7 @@ public class RDBDocumentStoreJDBC {
         }
     }
 
-    public <T extends Document> boolean insert(Connection connection, RDBTableMetaData tmd, List<T> documents) throws SQLException {
+    public <T extends Document> Set<String> insert(Connection connection, RDBTableMetaData tmd, List<T> documents) throws SQLException {
         PreparedStatement stmt = connection.prepareStatement(
                 "insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, DATA, BDATA) "
                         + "values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
@@ -289,20 +293,141 @@ public class RDBDocumentStoreJDBC {
                 stmt.addBatch();
             }
             int[] results = stmt.executeBatch();
-            boolean success = true;
+
+            Set<String> succesfullyInserted = new HashSet<String>();
             for (int i = 0; i < documents.size(); i++) {
                 int result = results[i];
                 if (result != 1 && result != Statement.SUCCESS_NO_INFO) {
                     LOG.error("DB insert failed for {}: {}", tmd.getName(), documents.get(i).getId());
-                    success = false;
+                } else {
+                    succesfullyInserted.add(documents.get(i).getId());
                 }
             }
-            return success;
+            return succesfullyInserted;
         } finally {
             stmt.close();
         }
     }
 
+    /**
+     * Update a list of documents using JDBC batches. Some of the updates may fail because of the concurrent
+     * changes. The method returns a set of successfully updated documents. It's the caller responsibility
+     * to compare the set with the list of input documents, find out which documents conflicted and take
+     * appropriate action.
+     * <p>
+     * If the {@code upsert} parameter is set to true, the method will also try to insert new documents, those
+     * which modcount equals to 1.
+     *
+     * @param connection JDBC connection
+     * @param tmd Table metadata
+     * @param documents List of documents to update
+     * @param upsert Insert new documents
+     * @return set containing ids of successfully updated documents
+     * @throws SQLException
+     */
+    public <T extends Document> Set<String> update(Connection connection, RDBTableMetaData tmd, List<T> documents, boolean upsert)
+            throws SQLException {
+        Set<String> successfulUpdates = new HashSet<String>();
+
+        PreparedStatement stmt = connection.prepareStatement("update " + tmd.getName()
+            + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ? and MODCOUNT = ?");
+        try {
+            List<String> updatedKeys = new ArrayList<String>();
+            for (T document : documents) {
+                Long modcount = (Long) document.get(MODCOUNT);
+                if (modcount == 1) {
+                    continue; // This is a new document. We'll deal with the inserts later.
+                }
+
+                String data = this.ser.asString(document);
+                Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG);
+                Boolean deletedOnce = (Boolean) document.get(NodeDocument.DELETED_ONCE);
+                Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
+
+                int si = 1;
+                stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT);
+                stmt.setObject(si++, (hasBinary != null && hasBinary.intValue() == NodeDocument.HAS_BINARY_VAL) ? 1 : 0,
+                        Types.SMALLINT);
+                stmt.setObject(si++, (deletedOnce != null && deletedOnce) ? 1 : 0, Types.SMALLINT);
+                stmt.setObject(si++, modcount, Types.BIGINT);
+                stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT);
+                stmt.setObject(si++, data.length(), Types.BIGINT);
+
+                if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) {
+                    stmt.setString(si++, data);
+                    stmt.setBinaryStream(si++, null, 0);
+                } else {
+                    stmt.setString(si++, "\"blob\"");
+                    byte[] bytes = asBytes(data);
+                    stmt.setBytes(si++, bytes);
+                }
+
+                setIdInStatement(tmd, stmt, si++, document.getId());
+                stmt.setObject(si++, modcount - 1, Types.BIGINT);
+                stmt.addBatch();
+                updatedKeys.add(document.getId());
+            }
+
+            int[] batchResults = stmt.executeBatch();
+
+            for (int i = 0; i < batchResults.length; i++) {
+                int result = batchResults[i];
+                if (result == 1 || result == Statement.SUCCESS_NO_INFO) {
+                    successfulUpdates.add(updatedKeys.get(i));
+                }
+            }
+        } finally {
+            stmt.close();
+        }
+
+        if (upsert) {
+            List<T> remainingDocuments = new ArrayList<T>(documents.size() - successfulUpdates.size());
+            for (T doc : documents) {
+                if (!successfulUpdates.contains(doc.getId())) {
+                    remainingDocuments.add(doc);
+                }
+            }
+
+            if (!remainingDocuments.isEmpty()) {
+                List<String> remainingDocumentIds = Lists.transform(remainingDocuments, idExtractor);
+                PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", remainingDocumentIds, tmd.isIdBinary());
+                StringBuilder sql = new StringBuilder("select ID from ").append(tmd.getName());
+                sql.append(" where ").append(inClause.getStatementComponent());
+
+                Set<String> documentsWithUpdatedModcount = new HashSet<String>();
+
+                PreparedStatement selectStmt = null;
+                ResultSet rs = null;
+                try {
+                    selectStmt = connection.prepareStatement(sql.toString());
+                    selectStmt.setPoolable(false);
+                    inClause.setParameters(selectStmt, 1);
+                    rs = selectStmt.executeQuery();
+                    while (rs.next()) {
+                        documentsWithUpdatedModcount.add(getIdFromRS(tmd, rs, 1));
+                    }
+                } finally {
+                    closeResultSet(rs);
+                    closeStatement(selectStmt);
+                }
+
+                Iterator<T> it = remainingDocuments.iterator();
+                while (it.hasNext()) {
+                    if (documentsWithUpdatedModcount.contains(it.next().getId())) {
+                        it.remove();
+                    }
+                }
+
+                if (!remainingDocuments.isEmpty()) {
+                    for (String id : insert(connection, tmd, remainingDocuments)) {
+                        successfulUpdates.add(id);
+                    }
+                }
+            }
+        }
+        return successfulUpdates;
+    }
+
     private final static Map<String, String> INDEXED_PROP_MAPPING;
     static {
         Map<String, String> tmp = new HashMap<String, String>();
@@ -450,6 +575,58 @@ public class RDBDocumentStoreJDBC {
         return result;
     }
 
+    public List<RDBRow> read(Connection connection, RDBTableMetaData tmd, Collection<String> keys) throws SQLException {
+        if (keys.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", keys, tmd.isIdBinary());
+        StringBuilder query = new StringBuilder();
+        query.append("select ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from ");
+        query.append(tmd.getName());
+        query.append(" where ").append(inClause.getStatementComponent());
+
+        PreparedStatement stmt = connection.prepareStatement(query.toString());
+        stmt.setPoolable(false);
+        try {
+            inClause.setParameters(stmt,  1);
+            ResultSet rs = stmt.executeQuery();
+
+            List<RDBRow> rows = new ArrayList<RDBRow>();
+            while (rs.next()) {
+                int col = 1;
+                String id = getIdFromRS(tmd, rs, col++);
+                long modified = rs.getLong(col++);
+                long modcount = rs.getLong(col++);
+                long cmodcount = rs.getLong(col++);
+                long hasBinary = rs.getLong(col++);
+                long deletedOnce = rs.getLong(col++);
+                String data = rs.getString(col++);
+                byte[] bdata = rs.getBytes(col++);
+                RDBRow row = new RDBRow(id, hasBinary == 1, deletedOnce == 1, modified, modcount, cmodcount, data, bdata);
+                rows.add(row);
+            }
+
+            return rows;
+        } catch (SQLException ex) {
+            LOG.error("attempting to read " + keys, ex);
+            // DB2 throws an SQLException for invalid keys; handle this more
+            // gracefully
+            if ("22001".equals(ex.getSQLState())) {
+                try {
+                    connection.rollback();
+                } catch (SQLException ex2) {
+                    LOG.debug("failed to rollback", ex2);
+                }
+                return null;
+            } else {
+                throw (ex);
+            }
+        } finally {
+            stmt.close();
+        }
+    }
+
     @CheckForNull
     public RDBRow read(Connection connection, RDBTableMetaData tmd, String id, long lastmodcount) throws SQLException {
         PreparedStatement stmt;
@@ -574,4 +751,11 @@ public class RDBDocumentStoreJDBC {
             stmt.setString(idx, id);
         }
     }
+
+    private static final Function<Document, String> idExtractor = new Function<Document, String>() {
+        @Override
+        public String apply(Document input) {
+            return input.getId();
+        }
+    };
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java Tue Jan  5 07:36:06 2016
@@ -29,6 +29,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -328,7 +329,7 @@ public class RDBJDBCTools {
         builder.append(')');
     }
 
-    public static PreparedStatementComponent createInStatement(final String fieldName, final List<String> values,
+    public static PreparedStatementComponent createInStatement(final String fieldName, final Collection<String> values,
             final boolean binary) {
         return new PreparedStatementComponent() {
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java Tue Jan  5 07:36:06 2016
@@ -23,12 +23,20 @@ import static org.junit.Assert.assertNul
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.sql.DataSource;
+
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceWrapper;
 import org.junit.Test;
 
 public class BulkCreateOrUpdateTest extends AbstractDocumentStoreTest {
 
     public BulkCreateOrUpdateTest(DocumentStoreFixture dsf) {
         super(dsf);
+        DataSource ds = dsf.getRDBDataSource();
+        if (ds instanceof RDBDataSourceWrapper) {
+            // test drivers that do not return precise batch results
+            ((RDBDataSourceWrapper)ds).setBatchResultPrecise(false);
+        }
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java Tue Jan  5 07:36:06 2016
@@ -18,9 +18,12 @@ package org.apache.jackrabbit.oak.plugin
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import org.junit.Test;
 
@@ -209,6 +212,68 @@ public class MultiDocumentStoreTest exte
         assertTrue(nd1.getLastCheckTime() > ds1checktime);
     }
 
+    @Test
+    public void testInterleavedBatchUpdate() {
+        int amount = 10;
+        int halfAmount = amount / 2;
+        String baseId = this.getClass().getName() + ".testInterleavedBatchUpdate";
+
+        // remove if present
+        for (int i = 0; i < amount; i++) {
+            String id = baseId + "-" + i;
+            NodeDocument nd = super.ds1.find(Collection.NODES, id);
+            if (nd != null) {
+                super.ds1.remove(Collection.NODES, id);
+            }
+            removeMe.add(id);
+        }
+
+        {
+            // create half of the entries in ds1
+            List<UpdateOp> ops = new ArrayList<UpdateOp>();
+            for (int i = 0; i < halfAmount; i++) {
+                String id = baseId + "-" + i;
+                UpdateOp up = new UpdateOp(id, true);
+                up.set(Document.ID, id);
+                up.set("_createdby", "ds1");
+                ops.add(up);
+            }
+            List<NodeDocument> result = super.ds1.createOrUpdate(Collection.NODES, ops);
+            assertEquals(halfAmount, result.size());
+            for (NodeDocument doc : result) {
+                assertNull(doc);
+            }
+        }
+
+        {
+            // create all of the entries in ds2
+            List<UpdateOp> ops = new ArrayList<UpdateOp>();
+            for (int i = 0; i < amount; i++) {
+                String id = baseId + "-" + i;
+                UpdateOp up = new UpdateOp(id, true);
+                up.set(Document.ID, id);
+                up.set("_createdby", "ds2");
+                ops.add(up);
+            }
+            List<NodeDocument> result = super.ds2.createOrUpdate(Collection.NODES, ops);
+            assertEquals(amount, result.size());
+            for (NodeDocument doc : result) {
+                // documents are either new or have been created by ds1
+                if (doc != null) {
+                    assertEquals("ds1", doc.get("_createdby"));
+                }
+            }
+        }
+
+        // final check: does DS1 see all documents including the changes made by DS2?
+        for (int i = 0; i < amount; i++) {
+            String id = baseId + "-" + i;
+            NodeDocument doc = super.ds1.find(Collection.NODES, id, 0);
+            assertNotNull(doc);
+            assertEquals("ds2", doc.get("_createdby"));
+        }
+    }
+
     private static long letTimeElapse() {
         long ts = System.currentTimeMillis();
         while (System.currentTimeMillis() == ts) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java Tue Jan  5 07:36:06 2016
@@ -53,6 +53,7 @@ public class RDBDataSourceWrapper implem
     // }
 
     private final DataSource ds;
+    private boolean batchResultPrecise = true;
 
     // Logging
 
@@ -84,6 +85,17 @@ public class RDBDataSourceWrapper implem
         return stopLog(Thread.currentThread());
     }
 
+    /**
+     * Set to {@code false} to simulate drivers/DBs that do not return the number of affected rows in {@link Statement#executeBatch()}.
+     */
+    public void setBatchResultPrecise(boolean precise) {
+        this.batchResultPrecise = precise;
+    }
+
+    public boolean isBatchResultPrecise() {
+        return this.batchResultPrecise;
+    }
+
     // DataSource
 
     public RDBDataSourceWrapper(DataSource ds) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java Tue Jan  5 07:36:06 2016
@@ -36,6 +36,7 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLWarning;
 import java.sql.SQLXML;
+import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -125,6 +126,14 @@ public class RDBPreparedStatementWrapper
         SQLException x = null;
         try {
             results = statement.executeBatch();
+            // not all JDBC drivers return the number of affected rows
+            if (!datasource.isBatchResultPrecise()) {
+                for (int i = 0; i < results.length; i++) {
+                    if (results[i] >= 0) {
+                        results[i] = Statement.SUCCESS_NO_INFO;
+                    }
+                }
+            }
             return results;
         } catch (SQLException ex) {
             x = ex;