You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2016/01/29 16:43:49 UTC

svn commit: r1727601 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/document/rdb/ test/java/org/apache/jackrabbit/oak/plugins/document/

Author: reschke
Date: Fri Jan 29 15:43:49 2016
New Revision: 1727601

URL: http://svn.apache.org/viewvc?rev=1727601&view=rev
Log:
OAK-3924: fix database-level row deadlock during bulk updates in RDB

apply changes on the JDBC level in predictable (alphabetical) order, do more commits, reject attempts to update the same document multiple times in one operation; re-enable tests.

(thanks to Tomek Rękawek for almost all of the work)

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java?rev=1727601&r1=1727600&r2=1727601&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStoreJDBC.java Fri Jan 29 15:43:49 2016
@@ -16,12 +16,15 @@
  */
 package org.apache.jackrabbit.oak.plugins.document.rdb;
 
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Sets.newHashSet;
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.CHAR2OCTETRATIO;
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore.asBytes;
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeResultSet;
 import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBJDBCTools.closeStatement;
 
 import java.io.UnsupportedEncodingException;
+import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -31,6 +34,7 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -266,8 +270,10 @@ public class RDBDocumentStoreJDBC {
                 "insert into " + tmd.getName() + "(ID, MODIFIED, HASBINARY, DELETEDONCE, MODCOUNT, CMODCOUNT, DSIZE, DATA, BDATA) "
                         + "values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
 
+        List<T> sortedDocs = sortDocuments(documents);
+        int[] results;
         try {
-            for (T document : documents) {
+            for (T document : sortedDocs) {
                 String data = this.ser.asString(document);
                 String id = document.getId();
                 Number hasBinary = (Number) document.get(NodeDocument.HAS_BINARY_FLAG);
@@ -293,21 +299,23 @@ public class RDBDocumentStoreJDBC {
                 }
                 stmt.addBatch();
             }
-            int[] results = stmt.executeBatch();
-
-            Set<String> succesfullyInserted = new HashSet<String>();
-            for (int i = 0; i < documents.size(); i++) {
-                int result = results[i];
-                if (result != 1 && result != Statement.SUCCESS_NO_INFO) {
-                    LOG.error("DB insert failed for {}: {}", tmd.getName(), documents.get(i).getId());
-                } else {
-                    succesfullyInserted.add(documents.get(i).getId());
-                }
-            }
-            return succesfullyInserted;
+            results = stmt.executeBatch();
+        } catch (BatchUpdateException ex) {
+            LOG.debug("Some of the batch updates failed", ex);
+            results = ex.getUpdateCounts();
         } finally {
             stmt.close();
         }
+        Set<String> succesfullyInserted = new HashSet<String>();
+        for (int i = 0; i < results.length; i++) {
+            int result = results[i];
+            if (result != 1 && result != Statement.SUCCESS_NO_INFO) {
+                LOG.debug("DB insert failed for {}: {}", tmd.getName(), sortedDocs.get(i).getId());
+            } else {
+                succesfullyInserted.add(sortedDocs.get(i).getId());
+            }
+        }
+        return succesfullyInserted;
     }
 
     /**
@@ -318,6 +326,9 @@ public class RDBDocumentStoreJDBC {
      * <p>
      * If the {@code upsert} parameter is set to true, the method will also try to insert new documents, those
      * which modcount equals to 1.
+     * <p>
+     * The order of applying updates will be different than order of the passed list, so there shouldn't be two
+     * updates related to the same document. An {@link IllegalArgumentException} will be thrown if there are.
      *
      * @param connection JDBC connection
      * @param tmd Table metadata
@@ -328,13 +339,16 @@ public class RDBDocumentStoreJDBC {
      */
     public <T extends Document> Set<String> update(Connection connection, RDBTableMetaData tmd, List<T> documents, boolean upsert)
             throws SQLException {
+        assertNoDuplicatedIds(documents);
+
         Set<String> successfulUpdates = new HashSet<String>();
+        List<String> updatedKeys = new ArrayList<String>();
+        int[] batchResults;
 
         PreparedStatement stmt = connection.prepareStatement("update " + tmd.getName()
             + " set MODIFIED = ?, HASBINARY = ?, DELETEDONCE = ?, MODCOUNT = ?, CMODCOUNT = ?, DSIZE = ?, DATA = ?, BDATA = ? where ID = ? and MODCOUNT = ?");
         try {
-            List<String> updatedKeys = new ArrayList<String>();
-            for (T document : documents) {
+            for (T document : sortDocuments(documents)) {
                 Long modcount = (Long) document.get(MODCOUNT);
                 if (modcount == 1) {
                     continue; // This is a new document. We'll deal with the inserts later.
@@ -368,19 +382,22 @@ public class RDBDocumentStoreJDBC {
                 stmt.addBatch();
                 updatedKeys.add(document.getId());
             }
-
-            int[] batchResults = stmt.executeBatch();
-
-            for (int i = 0; i < batchResults.length; i++) {
-                int result = batchResults[i];
-                if (result == 1 || result == Statement.SUCCESS_NO_INFO) {
-                    successfulUpdates.add(updatedKeys.get(i));
-                }
-            }
+            batchResults = stmt.executeBatch();
+            connection.commit();
+        } catch (BatchUpdateException ex) {
+            LOG.debug("Some of the batch updates failed", ex);
+            batchResults = ex.getUpdateCounts();
         } finally {
             stmt.close();
         }
 
+        for (int i = 0; i < batchResults.length; i++) {
+            int result = batchResults[i];
+            if (result == 1 || result == Statement.SUCCESS_NO_INFO) {
+                successfulUpdates.add(updatedKeys.get(i));
+            }
+        }
+
         if (upsert) {
             List<T> remainingDocuments = new ArrayList<T>(documents.size() - successfulUpdates.size());
             for (T doc : documents) {
@@ -407,6 +424,7 @@ public class RDBDocumentStoreJDBC {
                         while (rs.next()) {
                             documentsWithUpdatedModcount.add(getIdFromRS(tmd, rs, 1));
                         }
+                        connection.commit();
                     } finally {
                         closeResultSet(rs);
                         closeStatement(selectStmt);
@@ -430,6 +448,12 @@ public class RDBDocumentStoreJDBC {
         return successfulUpdates;
     }
 
+    private static <T extends Document> void assertNoDuplicatedIds(List<T> documents) {
+        if (newHashSet(transform(documents, idExtractor)).size() < documents.size()) {
+            throw new IllegalArgumentException("There are duplicated ids in the document list");
+        }
+    }
+
     private final static Map<String, String> INDEXED_PROP_MAPPING;
     static {
         Map<String, String> tmp = new HashMap<String, String>();
@@ -753,6 +777,17 @@ public class RDBDocumentStoreJDBC {
         }
     }
 
+    private static <T extends Document> List<T> sortDocuments(Collection<T> documents) {
+        List<T> result = new ArrayList<T>(documents);
+        Collections.sort(result, new Comparator<T>() {
+            @Override
+            public int compare(T o1, T o2) {
+                return o1.getId().compareTo(o2.getId());
+            }
+        });
+        return result;
+    }
+
     private static final Function<Document, String> idExtractor = new Function<Document, String>() {
         @Override
         public String apply(Document input) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java?rev=1727601&r1=1727600&r2=1727601&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateClusterTest.java Fri Jan 29 15:43:49 2016
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -31,7 +30,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
 import org.junit.Test;
 
 public class BulkCreateOrUpdateClusterTest extends AbstractMultiDocumentStoreTest {
@@ -129,8 +127,6 @@ public class BulkCreateOrUpdateClusterTe
      */
     @Test
     public void testConcurrentWithConflict() throws InterruptedException {
-        // see OAK-3924
-        assumeTrue(!(ds instanceof RDBDocumentStore));
         int threadCount = 10;
         int amount = 500;
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java?rev=1727601&r1=1727600&r2=1727601&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BulkCreateOrUpdateTest.java Fri Jan 29 15:43:49 2016
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -35,8 +34,6 @@ import java.util.concurrent.ConcurrentHa
 import javax.sql.DataSource;
 
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceWrapper;
-import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class BulkCreateOrUpdateTest extends AbstractDocumentStoreTest {
@@ -240,8 +237,6 @@ public class BulkCreateOrUpdateTest exte
      */
     @Test
     public void testConcurrentWithConflict() throws InterruptedException {
-        // see OAK-3924
-        assumeTrue(!(ds instanceof RDBDocumentStore));
         int threadCount = 10;
         int amount = 500;