You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2016/01/05 08:36:07 UTC
svn commit: r1723008 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/rdb/
test/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/jackrabbit/oak/plugins/document/rdb/
Author: reschke
Date: Tue Jan 5 07:36:06 2016
New Revision: 1723008
URL: http://svn.apache.org/viewvc?rev=1723008&view=rev
Log:
OAK-3637: Bulk document updates in RDBDocumentStore
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java Tue Jan 5 07:36:06 2016
@@ -17,6 +17,8 @@
package org.apache.jackrabbit.oak.plugins.document.rdb;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.partition;
import static org.apache.jackrabbit.oak.plugins.document.UpdateUtils.checkConditions;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement;
@@ -40,6 +42,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -81,6 +84,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
@@ -290,13 +294,172 @@ public class RDBDocumentStore implements
@Override
public <T extends Document> List<T> createOrUpdate(Collection<T> collection, List<UpdateOp> updateOps) {
- List<T> result = new ArrayList<T>(updateOps.size());
- for (UpdateOp update : updateOps) {
- result.add(createOrUpdate(collection, update));
+ List<T> result = null;
+ Map<String, UpdateOp> operationsToCover = new LinkedHashMap<String, UpdateOp>();
+
+ for (UpdateOp updateOp : updateOps) {
+ UpdateUtils.assertUnconditional(updateOp);
+ UpdateOp clone = updateOp.copy();
+ addUpdateCounters(clone);
+ operationsToCover.put(clone.getId(), clone);
+ }
+
+ Map<String, T> oldDocs = new HashMap<String, T>();
+ if (collection == Collection.NODES) {
+ oldDocs.putAll((Map<String, T>) readDocumentCached(collection, operationsToCover.keySet()));
+ }
+
+ int i = 0; // iteration count
+
+ // bulk update requires two DB requests, so if we have <= 2 operations
+ // it's better to send them sequentially
+ while (operationsToCover.size() > 2) {
+ // We should try to insert documents only during the first
+ // iteration. In the 2nd and 3rd iterations we only deal with
+ // conflicting documents, so they already exist in the database
+ // and there's no point in inserting them.
+ boolean upsert = i == 0;
+
+ if (i++ == 3) {
+ // operations that conflicted in 3 consecutive bulk
+ // updates should be applied sequentially
+ break;
+ }
+
+ for (List<UpdateOp> partition : partition(newArrayList(operationsToCover.values()), CHUNKSIZE)) {
+ Set<String> successfulUpdates = bulkUpdate(collection, partition, oldDocs, upsert);
+ operationsToCover.keySet().removeAll(successfulUpdates);
+ }
+ }
+
+ // if there are some changes left, we'll apply them one after another
+ for (UpdateOp updateOp : updateOps) {
+ if (operationsToCover.remove(updateOp.getId()) != null) {
+ // work on the original update operation
+ T oldDoc = createOrUpdate(collection, updateOp.copy());
+ if (oldDoc != null) {
+ oldDocs.put(oldDoc.getId(), oldDoc);
+ }
+ }
}
+
+ result = new ArrayList<T>(updateOps.size());
+ for (UpdateOp op : updateOps) {
+ result.add(oldDocs.get(op.getId()));
+ }
+
return result;
}
+ private <T extends Document> Map<String, T> readDocumentCached(Collection<T> collection, Set<String> keys) {
+ Map<String, T> documents = new HashMap<String, T>();
+
+ if (collection == Collection.NODES) {
+ for (String key : keys) {
+ NodeDocument cached = nodesCache.getIfPresent(key);
+ if (cached != null && cached != NodeDocument.NULL) {
+ T doc = castAsT(unwrap(cached));
+ documents.put(doc.getId(), doc);
+ }
+ }
+ }
+
+ Set<String> documentsToRead = Sets.difference(keys, documents.keySet());
+ Map<String, T> readDocuments = readDocumentsUncached(collection, documentsToRead);
+ documents.putAll(readDocuments);
+
+ if (collection == Collection.NODES) {
+ for (T doc : readDocuments.values()) {
+ nodesCache.putIfAbsent((NodeDocument) doc);
+ }
+ }
+
+ return documents;
+ }
+
+ private <T extends Document> Map<String, T> readDocumentsUncached(Collection<T> collection, Set<String> keys) {
+ Map<String, T> result = new HashMap<String, T>();
+
+ Connection connection = null;
+ RDBTableMetaData tmd = getTable(collection);
+ try {
+ connection = this.ch.getROConnection();
+ List<RDBRow> rows = db.read(connection, tmd, keys);
+
+ int size = rows.size();
+ for (int i = 0; i < size; i++) {
+ RDBRow row = rows.set(i, null);
+ T document = convertFromDBObject(collection, row);
+ result.put(document.getId(), document);
+ }
+ connection.commit();
+ } catch (Exception ex) {
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ return result;
+ }
+
+ private <T extends Document> Set<String> bulkUpdate(Collection<T> collection, List<UpdateOp> updates, Map<String, T> oldDocs, boolean upsert) {
+ Set<String> missingDocs = new HashSet<String>();
+ for (UpdateOp op : updates) {
+ if (!oldDocs.containsKey(op.getId())) {
+ missingDocs.add(op.getId());
+ }
+ }
+ for (T doc : readDocumentsUncached(collection, missingDocs).values()) {
+ oldDocs.put(doc.getId(), doc);
+ if (collection == Collection.NODES) {
+ nodesCache.putIfAbsent((NodeDocument) doc);
+ }
+ }
+
+ List<T> docsToUpdate = new ArrayList<T>(updates.size());
+ Set<String> keysToUpdate = new HashSet<String>();
+ for (UpdateOp update : updates) {
+ String id = update.getId();
+ T modifiedDoc = collection.newDocument(this);
+ if (oldDocs.containsKey(id)) {
+ oldDocs.get(id).deepCopy(modifiedDoc);
+ }
+ UpdateUtils.applyChanges(modifiedDoc, update);
+ docsToUpdate.add(modifiedDoc);
+ keysToUpdate.add(id);
+ }
+
+ Connection connection = null;
+ RDBTableMetaData tmd = getTable(collection);
+ try {
+ connection = this.ch.getRWConnection();
+ Set<String> successfulUpdates = db.update(connection, tmd, docsToUpdate, upsert);
+ connection.commit();
+
+ Set<String> failedUpdates = Sets.difference(keysToUpdate, successfulUpdates);
+ oldDocs.keySet().removeAll(failedUpdates);
+
+ if (collection == Collection.NODES) {
+ for (T doc : docsToUpdate) {
+ String id = doc.getId();
+ if (successfulUpdates.contains(id)) {
+ if (oldDocs.containsKey(id)) {
+ nodesCache.replaceCachedDocument((NodeDocument) oldDocs.get(id), (NodeDocument) doc);
+ } else {
+ nodesCache.putIfAbsent((NodeDocument) doc);
+ }
+ }
+ }
+ }
+
+ return successfulUpdates;
+ } catch (SQLException ex) {
+ this.ch.rollbackConnection(connection);
+ throw new DocumentStoreException(ex);
+ } finally {
+ this.ch.closeConnection(connection);
+ }
+ }
+
@Override
public <T extends Document> T findAndUpdate(Collection<T> collection, UpdateOp update) {
return internalCreateOrUpdate(collection, update, false, true);
@@ -1463,9 +1626,9 @@ public class RDBDocumentStore implements
RDBTableMetaData tmd = getTable(collection);
try {
connection = this.ch.getRWConnection();
- boolean result = db.insert(connection, tmd, documents);
+ Set<String> insertedKeys = db.insert(connection, tmd, documents);
connection.commit();
- return result;
+ return insertedKeys.size() == documents.size();
} catch (SQLException ex) {
this.ch.rollbackConnection(connection);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java Tue Jan 5 07:36:06 2016
@@ -29,9 +29,11 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -52,6 +54,9 @@ import org.apache.jackrabbit.oak.plugins
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
/**
* Implements (most) DB interactions used in {@link RDBDocumentStore}.
*/
@@ -150,7 +155,6 @@ public class RDBDocumentStoreJDBC {
public int delete(Connection connection, RDBTableMetaData tmd, List<String> ids) throws SQLException {
PreparedStatement stmt;
- int cnt = ids.size();
PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", ids, tmd.isIdBinary());
String sql = "delete from " + tmd.getName() + " where " + inClause.getStatementComponent();
stmt = connection.prepareStatement(sql);
@@ -158,7 +162,7 @@ public class RDBDocumentStoreJDBC {
try {
inClause.setParameters(stmt, 1);
int result = stmt.executeUpdate();
- if (result != cnt) {
+ if (result != ids.size()) {
LOG.debug("DB delete failed for " + tmd.getName() + "/" + ids);
}
return result;
@@ -256,7 +260,7 @@ public class RDBDocumentStoreJDBC {
}
}
- public <T extends Document> boolean insert(Connection connection, RDBTableMetaData tmd, List<T> documents) throws SQLException {
+ public <T extends Document> Set<String> insert(Connection connection, RDBTableMetaData tmd, List<T> documents) throws SQLException {
PreparedStatement stmt = connection.prepareStatement(
"insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, DATA, BDATA) "
+ "values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
@@ -289,20 +293,141 @@ public class RDBDocumentStoreJDBC {
stmt.addBatch();
}
int[] results = stmt.executeBatch();
- boolean success = true;
+
+ Set<String> succesfullyInserted = new HashSet<String>();
for (int i = 0; i < documents.size(); i++) {
int result = results[i];
if (result != 1 && result != Statement.SUCCESS_NO_INFO) {
LOG.error("DB insert failed for {}: {}", tmd.getName(), documents.get(i).getId());
- success = false;
+ } else {
+ succesfullyInserted.add(documents.get(i).getId());
}
}
- return success;
+ return succesfullyInserted;
} finally {
stmt.close();
}
}
+ /**
+ * Update a list of documents using JDBC batches. Some of the updates may fail because of the concurrent
+ * changes. The method returns a set of successfully updated documents. It's the caller responsibility
+ * to compare the set with the list of input documents, find out which documents conflicted and take
+ * appropriate action.
+ * <p>
+ * If the {@code upsert} parameter is set to true, the method will also try to insert new documents, those
+ * which modcount equals to 1.
+ *
+ * @param connection JDBC connection
+ * @param tmd Table metadata
+ * @param documents List of documents to update
+ * @param upsert Insert new documents
+ * @return set containing ids of successfully updated documents
+ * @throws SQLException
+ */
+ public <T extends Document> Set<String> update(Connection connection, RDBTableMetaData tmd, List<T> documents, boolean upsert)
+ throws SQLException {
+ Set<String> successfulUpdates = new HashSet<String>();
+
+ PreparedStatement stmt = connection.prepareStatement("update " + tmd.getName()
+ + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ? and MODCOUNT = ?");
+ try {
+ List<String> updatedKeys = new ArrayList<String>();
+ for (T document : documents) {
+ Long modcount = (Long) document.get(MODCOUNT);
+ if (modcount == 1) {
+ continue; // This is a new document. We'll deal with the inserts later.
+ }
+
+ String data = this.ser.asString(document);
+ Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG);
+ Boolean deletedOnce = (Boolean) document.get(NodeDocument.DELETED_ONCE);
+ Long cmodcount = (Long) document.get(COLLISIONSMODCOUNT);
+
+ int si = 1;
+ stmt.setObject(si++, document.get(MODIFIED), Types.BIGINT);
+ stmt.setObject(si++, (hasBinary != null && hasBinary.intValue() == NodeDocument.HAS_BINARY_VAL) ? 1 : 0,
+ Types.SMALLINT);
+ stmt.setObject(si++, (deletedOnce != null && deletedOnce) ? 1 : 0, Types.SMALLINT);
+ stmt.setObject(si++, modcount, Types.BIGINT);
+ stmt.setObject(si++, cmodcount == null ? Long.valueOf(0) : cmodcount, Types.BIGINT);
+ stmt.setObject(si++, data.length(), Types.BIGINT);
+
+ if (data.length() < tmd.getDataLimitInOctets() / CHAR2OCTETRATIO) {
+ stmt.setString(si++, data);
+ stmt.setBinaryStream(si++, null, 0);
+ } else {
+ stmt.setString(si++, "\"blob\"");
+ byte[] bytes = asBytes(data);
+ stmt.setBytes(si++, bytes);
+ }
+
+ setIdInStatement(tmd, stmt, si++, document.getId());
+ stmt.setObject(si++, modcount - 1, Types.BIGINT);
+ stmt.addBatch();
+ updatedKeys.add(document.getId());
+ }
+
+ int[] batchResults = stmt.executeBatch();
+
+ for (int i = 0; i < batchResults.length; i++) {
+ int result = batchResults[i];
+ if (result == 1 || result == Statement.SUCCESS_NO_INFO) {
+ successfulUpdates.add(updatedKeys.get(i));
+ }
+ }
+ } finally {
+ stmt.close();
+ }
+
+ if (upsert) {
+ List<T> remainingDocuments = new ArrayList<T>(documents.size() - successfulUpdates.size());
+ for (T doc : documents) {
+ if (!successfulUpdates.contains(doc.getId())) {
+ remainingDocuments.add(doc);
+ }
+ }
+
+ if (!remainingDocuments.isEmpty()) {
+ List<String> remainingDocumentIds = Lists.transform(remainingDocuments, idExtractor);
+ PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", remainingDocumentIds, tmd.isIdBinary());
+ StringBuilder sql = new StringBuilder("select ID from ").append(tmd.getName());
+ sql.append(" where ").append(inClause.getStatementComponent());
+
+ Set<String> documentsWithUpdatedModcount = new HashSet<String>();
+
+ PreparedStatement selectStmt = null;
+ ResultSet rs = null;
+ try {
+ selectStmt = connection.prepareStatement(sql.toString());
+ selectStmt.setPoolable(false);
+ inClause.setParameters(selectStmt, 1);
+ rs = selectStmt.executeQuery();
+ while (rs.next()) {
+ documentsWithUpdatedModcount.add(getIdFromRS(tmd, rs, 1));
+ }
+ } finally {
+ closeResultSet(rs);
+ closeStatement(selectStmt);
+ }
+
+ Iterator<T> it = remainingDocuments.iterator();
+ while (it.hasNext()) {
+ if (documentsWithUpdatedModcount.contains(it.next().getId())) {
+ it.remove();
+ }
+ }
+
+ if (!remainingDocuments.isEmpty()) {
+ for (String id : insert(connection, tmd, remainingDocuments)) {
+ successfulUpdates.add(id);
+ }
+ }
+ }
+ }
+ return successfulUpdates;
+ }
+
private final static Map<String, String> INDEXED_PROP_MAPPING;
static {
Map<String, String> tmp = new HashMap<String, String>();
@@ -450,6 +575,58 @@ public class RDBDocumentStoreJDBC {
return result;
}
+ public List<RDBRow> read(Connection connection, RDBTableMetaData tmd, Collection<String> keys) throws SQLException {
+ if (keys.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ PreparedStatementComponent inClause = RDBJDBCTools.createInStatement("ID", keys, tmd.isIdBinary());
+ StringBuilder query = new StringBuilder();
+ query.append("select ID, MODIFIED, MODCOUNT, CMODCOUNT, HASBINARY, DELETEDONCE, DATA, BDATA from ");
+ query.append(tmd.getName());
+ query.append(" where ").append(inClause.getStatementComponent());
+
+ PreparedStatement stmt = connection.prepareStatement(query.toString());
+ stmt.setPoolable(false);
+ try {
+ inClause.setParameters(stmt, 1);
+ ResultSet rs = stmt.executeQuery();
+
+ List<RDBRow> rows = new ArrayList<RDBRow>();
+ while (rs.next()) {
+ int col = 1;
+ String id = getIdFromRS(tmd, rs, col++);
+ long modified = rs.getLong(col++);
+ long modcount = rs.getLong(col++);
+ long cmodcount = rs.getLong(col++);
+ long hasBinary = rs.getLong(col++);
+ long deletedOnce = rs.getLong(col++);
+ String data = rs.getString(col++);
+ byte[] bdata = rs.getBytes(col++);
+ RDBRow row = new RDBRow(id, hasBinary == 1, deletedOnce == 1, modified, modcount, cmodcount, data, bdata);
+ rows.add(row);
+ }
+
+ return rows;
+ } catch (SQLException ex) {
+ LOG.error("attempting to read " + keys, ex);
+ // DB2 throws an SQLException for invalid keys; handle this more
+ // gracefully
+ if ("22001".equals(ex.getSQLState())) {
+ try {
+ connection.rollback();
+ } catch (SQLException ex2) {
+ LOG.debug("failed to rollback", ex2);
+ }
+ return null;
+ } else {
+ throw (ex);
+ }
+ } finally {
+ stmt.close();
+ }
+ }
+
@CheckForNull
public RDBRow read(Connection connection, RDBTableMetaData tmd, String id, long lastmodcount) throws SQLException {
PreparedStatement stmt;
@@ -574,4 +751,11 @@ public class RDBDocumentStoreJDBC {
stmt.setString(idx, id);
}
}
+
+ private static final Function<Document, String> idExtractor = new Function<Document, String>() {
+ @Override
+ public String apply(Document input) {
+ return input.getId();
+ }
+ };
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBJDBCTools.java Tue Jan 5 07:36:06 2016
@@ -29,6 +29,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -328,7 +329,7 @@ public class RDBJDBCTools {
builder.append(')');
}
- public static PreparedStatementComponent createInStatement(final String fieldName, final List<String> values,
+ public static PreparedStatementComponent createInStatement(final String fieldName, final Collection<String> values,
final boolean binary) {
return new PreparedStatementComponent() {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java Tue Jan 5 07:36:06 2016
@@ -23,12 +23,20 @@ import static org.junit.Assert.assertNul
import java.util.ArrayList;
import java.util.List;
+import javax.sql.DataSource;
+
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceWrapper;
import org.junit.Test;
public class BulkCreateOrUpdateTest extends AbstractDocumentStoreTest {
public BulkCreateOrUpdateTest(DocumentStoreFixture dsf) {
super(dsf);
+ DataSource ds = dsf.getRDBDataSource();
+ if (ds instanceof RDBDataSourceWrapper) {
+ // test drivers that do not return precise batch results
+ ((RDBDataSourceWrapper)ds).setBatchResultPrecise(false);
+ }
}
/**
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MultiDocumentStoreTest.java Tue Jan 5 07:36:06 2016
@@ -18,9 +18,12 @@ package org.apache.jackrabbit.oak.plugin
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import org.junit.Test;
@@ -209,6 +212,68 @@ public class MultiDocumentStoreTest exte
assertTrue(nd1.getLastCheckTime() > ds1checktime);
}
+ @Test
+ public void testInterleavedBatchUpdate() {
+ int amount = 10;
+ int halfAmount = amount / 2;
+ String baseId = this.getClass().getName() + ".testInterleavedBatchUpdate";
+
+ // remove if present
+ for (int i = 0; i < amount; i++) {
+ String id = baseId + "-" + i;
+ NodeDocument nd = super.ds1.find(Collection.NODES, id);
+ if (nd != null) {
+ super.ds1.remove(Collection.NODES, id);
+ }
+ removeMe.add(id);
+ }
+
+ {
+ // create half of the entries in ds1
+ List<UpdateOp> ops = new ArrayList<UpdateOp>();
+ for (int i = 0; i < halfAmount; i++) {
+ String id = baseId + "-" + i;
+ UpdateOp up = new UpdateOp(id, true);
+ up.set(Document.ID, id);
+ up.set("_createdby", "ds1");
+ ops.add(up);
+ }
+ List<NodeDocument> result = super.ds1.createOrUpdate(Collection.NODES, ops);
+ assertEquals(halfAmount, result.size());
+ for (NodeDocument doc : result) {
+ assertNull(doc);
+ }
+ }
+
+ {
+ // create all of the entries in ds2
+ List<UpdateOp> ops = new ArrayList<UpdateOp>();
+ for (int i = 0; i < amount; i++) {
+ String id = baseId + "-" + i;
+ UpdateOp up = new UpdateOp(id, true);
+ up.set(Document.ID, id);
+ up.set("_createdby", "ds2");
+ ops.add(up);
+ }
+ List<NodeDocument> result = super.ds2.createOrUpdate(Collection.NODES, ops);
+ assertEquals(amount, result.size());
+ for (NodeDocument doc : result) {
+ // documents are either new or have been created by ds1
+ if (doc != null) {
+ assertEquals("ds1", doc.get("_createdby"));
+ }
+ }
+ }
+
+ // final check: does DS1 see all documents including the changes made by DS2?
+ for (int i = 0; i < amount; i++) {
+ String id = baseId + "-" + i;
+ NodeDocument doc = super.ds1.find(Collection.NODES, id, 0);
+ assertNotNull(doc);
+ assertEquals("ds2", doc.get("_createdby"));
+ }
+ }
+
private static long letTimeElapse() {
long ts = System.currentTimeMillis();
while (System.currentTimeMillis() == ts) {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDataSourceWrapper.java Tue Jan 5 07:36:06 2016
@@ -53,6 +53,7 @@ public class RDBDataSourceWrapper implem
// }
private final DataSource ds;
+ private boolean batchResultPrecise = true;
// Logging
@@ -84,6 +85,17 @@ public class RDBDataSourceWrapper implem
return stopLog(Thread.currentThread());
}
+ /**
+ * Set to {@code false} to simulate drivers/DBs that do not return the number of affected rows in {@link Statement#executeBatch()}.
+ */
+ public void setBatchResultPrecise(boolean precise) {
+ this.batchResultPrecise = precise;
+ }
+
+ public boolean isBatchResultPrecise() {
+ return this.batchResultPrecise;
+ }
+
// DataSource
public RDBDataSourceWrapper(DataSource ds) {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java?rev=1723008&r1=1723007&r2=1723008&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBPreparedStatementWrapper.java Tue Jan 5 07:36:06 2016
@@ -36,6 +36,7 @@ import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
+import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -125,6 +126,14 @@ public class RDBPreparedStatementWrapper
SQLException x = null;
try {
results = statement.executeBatch();
+ // not all JDBC drivers return the number of affected rows
+ if (!datasource.isBatchResultPrecise()) {
+ for (int i = 0; i < results.length; i++) {
+ if (results[i] >= 0) {
+ results[i] = Statement.SUCCESS_NO_INFO;
+ }
+ }
+ }
return results;
} catch (SQLException ex) {
x = ex;